From cf821515fb215a9a6fb911f8a896019dacdc7413 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 9 Jul 2024 13:17:25 +0300 Subject: [PATCH] Refactoring for versioned caches --- .../cache/CaffeineTbCacheTransaction.java | 2 +- .../cache/CaffeineTbTransactionalCache.java | 25 +++++---- .../server/cache/RedisTbCacheTransaction.java | 5 +- .../cache/RedisTbTransactionalCache.java | 6 ++- .../server/cache/TbCacheTransaction.java | 2 +- .../server/cache/TbTransactionalCache.java | 4 +- .../cache/VersionedCaffeineTbCache.java | 29 ++++------ .../server/cache/VersionedRedisTbCache.java | 53 +++++++++---------- .../server/cache/VersionedTbCache.java | 3 +- .../resources/application-test.properties | 13 ++++- 10 files changed, 77 insertions(+), 65 deletions(-) diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbCacheTransaction.java b/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbCacheTransaction.java index 54465b0b50..03d9099541 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbCacheTransaction.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbCacheTransaction.java @@ -41,7 +41,7 @@ public class CaffeineTbCacheTransaction pendingPuts = new LinkedHashMap<>(); @Override - public void putIfAbsent(K key, V value) { + public void put(K key, V value) { pendingPuts.put(key, value); } diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbTransactionalCache.java index 9c01b47b88..e0a1457d72 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbTransactionalCache.java @@ -17,6 +17,7 @@ package org.thingsboard.server.cache; import lombok.Getter; import lombok.RequiredArgsConstructor; +import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; import java.io.Serializable; @@ -26,6 +27,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.locks.Lock; @@ -34,17 +36,22 @@ import java.util.concurrent.locks.ReentrantLock; @RequiredArgsConstructor public abstract class CaffeineTbTransactionalCache implements TbTransactionalCache { - private final CacheManager cacheManager; @Getter - private final String cacheName; - - private final Lock lock = new ReentrantLock(); + protected final String cacheName; + protected final Cache cache; + protected final Lock lock = new ReentrantLock(); private final Map> objectTransactions = new HashMap<>(); private final Map> transactions = new HashMap<>(); + public CaffeineTbTransactionalCache(CacheManager cacheManager, String cacheName) { + this.cacheName = cacheName; + this.cache = Optional.ofNullable(cacheManager.getCache(cacheName)) + .orElseThrow(() -> new IllegalArgumentException("Cache '" + cacheName + "' is not configured")); + } + @Override public TbCacheValueWrapper get(K key) { - return SimpleTbCacheValueWrapper.wrap(cacheManager.getCache(cacheName).get(key)); + return SimpleTbCacheValueWrapper.wrap(cache.get(key)); } @Override @@ -52,7 +59,7 @@ public abstract class CaffeineTbTransactionalCache newTransaction(List keys) { @@ -181,7 +188,7 @@ public abstract class CaffeineTbTransactionalCache transactionsIds = objectTransactions.get(key); if (transactionsIds != null) { for (UUID otherTrId : transactionsIds) { diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbCacheTransaction.java b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbCacheTransaction.java index 0cb2d661db..fb852493ce 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbCacheTransaction.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbCacheTransaction.java @@ -18,7 +18,6 @@ package org.thingsboard.server.cache; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.RedisConnection; -import org.springframework.data.redis.connection.RedisStringCommands; import java.io.Serializable; import java.util.Objects; @@ -31,8 +30,8 @@ public class RedisTbCacheTransaction { - void putIfAbsent(K key, V value); + void put(K key, V value); boolean commit(); diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java index be0b38b65e..7507ef99ff 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/TbTransactionalCache.java @@ -68,7 +68,7 @@ public interface TbTransactionalCache implements VersionedTbCache { +public abstract class VersionedCaffeineTbCache extends CaffeineTbTransactionalCache implements VersionedTbCache { - private final CacheManager cacheManager; - private final String cacheName; - - private final Lock lock = new ReentrantLock(); + public VersionedCaffeineTbCache(CacheManager cacheManager, String cacheName) { + super(cacheManager, cacheName); + } @Override public TbCacheValueWrapper get(K key) { @@ -57,7 +52,8 @@ public abstract class VersionedCaffeineTbCache versionValuePair = doGet(key); if (versionValuePair == null || version > versionValuePair.getFirst()) { - cacheManager.getCache(cacheName).put(key, TbPair.of(version, value)); + failAllTransactionsByKey(key); + cache.put(key, TbPair.of(version, value)); } } finally { lock.unlock(); @@ -65,7 +61,7 @@ public abstract class VersionedCaffeineTbCache doGet(K key) { - Cache.ValueWrapper source = cacheManager.getCache(cacheName).get(key); + Cache.ValueWrapper source = cache.get(key); return source == null ? null : (TbPair) source.get(); } @@ -73,7 +69,8 @@ public abstract class VersionedCaffeineTbCache extends RedisTbTransactionalCache implements VersionedTbCache { @@ -47,7 +46,7 @@ public abstract class VersionedRedisTbCacheI8", newVersion) .. newValue redis.call('SET', key, newValueWithVersion, 'EX', expiration) end - + local function bytes_to_number(bytes) local n = 0 for i = 1, 8 do @@ -102,33 +101,44 @@ public abstract class VersionedRedisTbCache newTransactionForKey(K key) { - throw new NotImplementedException("newTransactionForKey is not supported by versioned cache"); - } - - @Override - public TbCacheTransaction newTransactionForKeys(List keys) { - throw new NotImplementedException("newTransactionForKeys is not supported by versioned cache"); - } - } diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedTbCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedTbCache.java index aaec7a47be..8fcfcff8d7 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedTbCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedTbCache.java @@ -19,7 +19,7 @@ import org.thingsboard.server.common.data.HasVersion; import java.io.Serializable; -public interface VersionedTbCache { +public interface VersionedTbCache extends TbTransactionalCache { TbCacheValueWrapper get(K key); @@ -30,4 +30,5 @@ public interface VersionedTbCache