Merge remote-tracking branch 'origin/feature/attr_tskv_version' into feature/entities-version

This commit is contained in:
ViacheslavKlimov 2024-07-09 13:19:02 +03:00
commit 6190c30662
10 changed files with 77 additions and 65 deletions

View File

@ -41,7 +41,7 @@ public class CaffeineTbCacheTransaction<K extends Serializable, V extends Serial
private final Map<Object, Object> pendingPuts = new LinkedHashMap<>(); private final Map<Object, Object> pendingPuts = new LinkedHashMap<>();
@Override @Override
public void putIfAbsent(K key, V value) { public void put(K key, V value) {
pendingPuts.put(key, value); pendingPuts.put(key, value);
} }

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.cache;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager; import org.springframework.cache.CacheManager;
import java.io.Serializable; import java.io.Serializable;
@ -26,6 +27,7 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -34,17 +36,22 @@ import java.util.concurrent.locks.ReentrantLock;
@RequiredArgsConstructor @RequiredArgsConstructor
public abstract class CaffeineTbTransactionalCache<K extends Serializable, V extends Serializable> implements TbTransactionalCache<K, V> { public abstract class CaffeineTbTransactionalCache<K extends Serializable, V extends Serializable> implements TbTransactionalCache<K, V> {
private final CacheManager cacheManager;
@Getter @Getter
private final String cacheName; protected final String cacheName;
protected final Cache cache;
private final Lock lock = new ReentrantLock(); protected final Lock lock = new ReentrantLock();
private final Map<K, Set<UUID>> objectTransactions = new HashMap<>(); private final Map<K, Set<UUID>> objectTransactions = new HashMap<>();
private final Map<UUID, CaffeineTbCacheTransaction<K, V>> transactions = new HashMap<>(); private final Map<UUID, CaffeineTbCacheTransaction<K, V>> transactions = new HashMap<>();
public CaffeineTbTransactionalCache(CacheManager cacheManager, String cacheName) {
this.cacheName = cacheName;
this.cache = Optional.ofNullable(cacheManager.getCache(cacheName))
.orElseThrow(() -> new IllegalArgumentException("Cache '" + cacheName + "' is not configured"));
}
@Override @Override
public TbCacheValueWrapper<V> get(K key) { public TbCacheValueWrapper<V> get(K key) {
return SimpleTbCacheValueWrapper.wrap(cacheManager.getCache(cacheName).get(key)); return SimpleTbCacheValueWrapper.wrap(cache.get(key));
} }
@Override @Override
@ -52,7 +59,7 @@ public abstract class CaffeineTbTransactionalCache<K extends Serializable, V ext
lock.lock(); lock.lock();
try { try {
failAllTransactionsByKey(key); failAllTransactionsByKey(key);
cacheManager.getCache(cacheName).put(key, value); cache.put(key, value);
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@ -110,11 +117,11 @@ public abstract class CaffeineTbTransactionalCache<K extends Serializable, V ext
} }
void doPutIfAbsent(Object key, Object value) { void doPutIfAbsent(Object key, Object value) {
cacheManager.getCache(cacheName).putIfAbsent(key, value); cache.putIfAbsent(key, value);
} }
void doEvict(K key) { void doEvict(K key) {
cacheManager.getCache(cacheName).evict(key); cache.evict(key);
} }
TbCacheTransaction<K, V> newTransaction(List<K> keys) { TbCacheTransaction<K, V> newTransaction(List<K> keys) {
@ -181,7 +188,7 @@ public abstract class CaffeineTbTransactionalCache<K extends Serializable, V ext
} }
} }
private void failAllTransactionsByKey(K key) { protected void failAllTransactionsByKey(K key) {
Set<UUID> transactionsIds = objectTransactions.get(key); Set<UUID> transactionsIds = objectTransactions.get(key);
if (transactionsIds != null) { if (transactionsIds != null) {
for (UUID otherTrId : transactionsIds) { for (UUID otherTrId : transactionsIds) {

View File

@ -18,7 +18,6 @@ package org.thingsboard.server.cache;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStringCommands;
import java.io.Serializable; import java.io.Serializable;
import java.util.Objects; import java.util.Objects;
@ -31,8 +30,8 @@ public class RedisTbCacheTransaction<K extends Serializable, V extends Serializa
private final RedisConnection connection; private final RedisConnection connection;
@Override @Override
public void putIfAbsent(K key, V value) { public void put(K key, V value) {
cache.put(connection, key, value, RedisStringCommands.SetOption.UPSERT); cache.put(key, value, connection);
} }
@Override @Override

View File

@ -103,10 +103,14 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
@Override @Override
public void put(K key, V value) { public void put(K key, V value) {
try (var connection = connectionFactory.getConnection()) { try (var connection = connectionFactory.getConnection()) {
put(connection, key, value, RedisStringCommands.SetOption.UPSERT); put(key, value, connection);
} }
} }
public void put(K key, V value, RedisConnection connection) {
put(connection, key, value, RedisStringCommands.SetOption.UPSERT);
}
@Override @Override
public void putIfAbsent(K key, V value) { public void putIfAbsent(K key, V value) {
try (var connection = connectionFactory.getConnection()) { try (var connection = connectionFactory.getConnection()) {

View File

@ -17,7 +17,7 @@ package org.thingsboard.server.cache;
public interface TbCacheTransaction<K, V> { public interface TbCacheTransaction<K, V> {
void putIfAbsent(K key, V value); void put(K key, V value);
boolean commit(); boolean commit();

View File

@ -68,7 +68,7 @@ public interface TbTransactionalCache<K extends Serializable, V extends Serializ
try { try {
V dbValue = dbCall.get(); V dbValue = dbCall.get();
if (dbValue != null || cacheNullValue) { if (dbValue != null || cacheNullValue) {
cacheTransaction.putIfAbsent(key, dbValue); cacheTransaction.put(key, dbValue);
cacheTransaction.commit(); cacheTransaction.commit();
return dbValue; return dbValue;
} else { } else {
@ -104,7 +104,7 @@ public interface TbTransactionalCache<K extends Serializable, V extends Serializ
try { try {
R dbValue = dbCall.get(); R dbValue = dbCall.get();
if (dbValue != null || cacheNullValue) { if (dbValue != null || cacheNullValue) {
cacheTransaction.putIfAbsent(key, dbValueToCacheValue.apply(dbValue)); cacheTransaction.put(key, dbValueToCacheValue.apply(dbValue));
cacheTransaction.commit(); cacheTransaction.commit();
return dbValue; return dbValue;
} else { } else {

View File

@ -15,23 +15,18 @@
*/ */
package org.thingsboard.server.cache; package org.thingsboard.server.cache;
import lombok.RequiredArgsConstructor;
import org.springframework.cache.Cache; import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager; import org.springframework.cache.CacheManager;
import org.thingsboard.server.common.data.HasVersion; import org.thingsboard.server.common.data.HasVersion;
import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.data.util.TbPair;
import java.io.Serializable; import java.io.Serializable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@RequiredArgsConstructor public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends Serializable & HasVersion> extends CaffeineTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends Serializable & HasVersion> implements VersionedTbCache<K, V> {
private final CacheManager cacheManager; public VersionedCaffeineTbCache(CacheManager cacheManager, String cacheName) {
private final String cacheName; super(cacheManager, cacheName);
}
private final Lock lock = new ReentrantLock();
@Override @Override
public TbCacheValueWrapper<V> get(K key) { public TbCacheValueWrapper<V> get(K key) {
@ -57,7 +52,8 @@ public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends
try { try {
TbPair<Long, V> versionValuePair = doGet(key); TbPair<Long, V> versionValuePair = doGet(key);
if (versionValuePair == null || version > versionValuePair.getFirst()) { if (versionValuePair == null || version > versionValuePair.getFirst()) {
cacheManager.getCache(cacheName).put(key, TbPair.of(version, value)); failAllTransactionsByKey(key);
cache.put(key, TbPair.of(version, value));
} }
} finally { } finally {
lock.unlock(); lock.unlock();
@ -65,7 +61,7 @@ public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends
} }
private TbPair<Long, V> doGet(K key) { private TbPair<Long, V> doGet(K key) {
Cache.ValueWrapper source = cacheManager.getCache(cacheName).get(key); Cache.ValueWrapper source = cache.get(key);
return source == null ? null : (TbPair<Long, V>) source.get(); return source == null ? null : (TbPair<Long, V>) source.get();
} }
@ -73,7 +69,8 @@ public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends
public void evict(K key) { public void evict(K key) {
lock.lock(); lock.lock();
try { try {
cacheManager.getCache(cacheName).evict(key); failAllTransactionsByKey(key);
cache.evict(key);
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@ -84,11 +81,7 @@ public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends
if (version == null) { if (version == null) {
return; return;
} }
lock.lock(); put(key, null, version);
try {
put(key, null, version);
} finally {
lock.unlock();
}
} }
} }

View File

@ -29,7 +29,6 @@ import org.thingsboard.server.common.data.HasVersion;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List;
@Slf4j @Slf4j
public abstract class VersionedRedisTbCache<K extends Serializable, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbCache<K, V> { public abstract class VersionedRedisTbCache<K extends Serializable, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
@ -47,7 +46,7 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
local newValueWithVersion = struct.pack(">I8", newVersion) .. newValue local newValueWithVersion = struct.pack(">I8", newVersion) .. newValue
redis.call('SET', key, newValueWithVersion, 'EX', expiration) redis.call('SET', key, newValueWithVersion, 'EX', expiration)
end end
local function bytes_to_number(bytes) local function bytes_to_number(bytes)
local n = 0 local n = 0
for i = 1, 8 do for i = 1, 8 do
@ -102,33 +101,44 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
@Override @Override
public void put(K key, V value, Long version) { public void put(K key, V value, Long version) {
log.trace("put [{}][{}][{}]", key, value, version);
doPut(key, value, version, cacheTtl); doPut(key, value, version, cacheTtl);
} }
@Override
public void put(K key, V value, RedisConnection connection) {
Long version = value != null ? value.getVersion() : 0;
byte[] rawKey = getRawKey(key);
doPut(rawKey, value, version, cacheTtl, connection);
}
private void doPut(K key, V value, Long version, Expiration expiration) { private void doPut(K key, V value, Long version, Expiration expiration) {
log.trace("put [{}][{}][{}]", key, value, version);
if (version == null) { if (version == null) {
return; return;
} }
final byte[] rawKey = getRawKey(key); final byte[] rawKey = getRawKey(key);
try (var connection = getConnection(rawKey)) {
doPut(rawKey, value, version, expiration, connection);
}
}
private void doPut(byte[] rawKey, V value, Long version, Expiration expiration, RedisConnection connection) {
byte[] rawValue = getRawValue(value); byte[] rawValue = getRawValue(value);
byte[] rawVersion = StringRedisSerializer.UTF_8.serialize(String.valueOf(version)); byte[] rawVersion = StringRedisSerializer.UTF_8.serialize(String.valueOf(version));
byte[] rawExpiration = StringRedisSerializer.UTF_8.serialize(String.valueOf(expiration.getExpirationTimeInSeconds())); byte[] rawExpiration = StringRedisSerializer.UTF_8.serialize(String.valueOf(expiration.getExpirationTimeInSeconds()));
try (var connection = getConnection(rawKey)) { try {
connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration);
} catch (InvalidDataAccessApiUsageException e) {
log.debug("loading LUA [{}]", connection.getNativeConnection());
String sha = connection.scriptingCommands().scriptLoad(SET_VERSIONED_VALUE_LUA_SCRIPT);
if (!Arrays.equals(SET_VERSIONED_VALUE_SHA, StringRedisSerializer.UTF_8.serialize(sha))) {
log.error("SHA for SET_VERSIONED_VALUE_LUA_SCRIPT wrong! Expected [{}], but actual [{}]", new String(SET_VERSIONED_VALUE_SHA), sha);
}
try { try {
connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration); connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration);
} catch (InvalidDataAccessApiUsageException e) { } catch (InvalidDataAccessApiUsageException ignored) {
log.debug("loading LUA [{}]", connection.getNativeConnection()); log.debug("Slowly executing eval instead of fast evalsha");
String sha = connection.scriptingCommands().scriptLoad(SET_VERSIONED_VALUE_LUA_SCRIPT); connection.scriptingCommands().eval(SET_VERSIONED_VALUE_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration);
if (!Arrays.equals(SET_VERSIONED_VALUE_SHA, StringRedisSerializer.UTF_8.serialize(sha))) {
log.error("SHA for SET_VERSIONED_VALUE_LUA_SCRIPT wrong! Expected [{}], but actual [{}]", new String(SET_VERSIONED_VALUE_SHA), sha);
}
try {
connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration);
} catch (InvalidDataAccessApiUsageException ignored) {
log.debug("Slowly executing eval instead of fast evalsha");
connection.scriptingCommands().eval(SET_VERSIONED_VALUE_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration);
}
} }
} }
} }
@ -143,7 +153,6 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
@Override @Override
public void putIfAbsent(K key, V value) { public void putIfAbsent(K key, V value) {
log.trace("putIfAbsent [{}][{}]", key, value);
throw new NotImplementedException("putIfAbsent is not supported by versioned cache"); throw new NotImplementedException("putIfAbsent is not supported by versioned cache");
} }
@ -157,14 +166,4 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
throw new NotImplementedException("evictOrPut is not supported by versioned cache"); throw new NotImplementedException("evictOrPut is not supported by versioned cache");
} }
@Override
public TbCacheTransaction<K, V> newTransactionForKey(K key) {
throw new NotImplementedException("newTransactionForKey is not supported by versioned cache");
}
@Override
public TbCacheTransaction<K, V> newTransactionForKeys(List<K> keys) {
throw new NotImplementedException("newTransactionForKeys is not supported by versioned cache");
}
} }

View File

@ -19,7 +19,7 @@ import org.thingsboard.server.common.data.HasVersion;
import java.io.Serializable; import java.io.Serializable;
public interface VersionedTbCache<K extends Serializable, V extends Serializable & HasVersion> { public interface VersionedTbCache<K extends Serializable, V extends Serializable & HasVersion> extends TbTransactionalCache<K, V> {
TbCacheValueWrapper<V> get(K key); TbCacheValueWrapper<V> get(K key);
@ -30,4 +30,5 @@ public interface VersionedTbCache<K extends Serializable, V extends Serializable
void evict(K key); void evict(K key);
void evict(K key, Long version); void evict(K key, Long version);
} }

View File

@ -63,8 +63,8 @@ cache.specs.attributes.maxSize=100000
cache.specs.tsLatest.timeToLiveInMinutes=1440 cache.specs.tsLatest.timeToLiveInMinutes=1440
cache.specs.tsLatest.maxSize=100000 cache.specs.tsLatest.maxSize=100000
cache.specs.tokensOutdatageTime.timeToLiveInMinutes=1440 cache.specs.userSessionsInvalidation.timeToLiveInMinutes=1440
cache.specs.tokensOutdatageTime.maxSize=100000 cache.specs.userSessionsInvalidation.maxSize=10000
cache.specs.otaPackages.timeToLiveInMinutes=1440 cache.specs.otaPackages.timeToLiveInMinutes=1440
cache.specs.otaPackages.maxSize=100000 cache.specs.otaPackages.maxSize=100000
@ -93,6 +93,15 @@ cache.specs.resourceInfo.maxSize=10000
cache.specs.alarmTypes.timeToLiveInMinutes=60 cache.specs.alarmTypes.timeToLiveInMinutes=60
cache.specs.alarmTypes.maxSize=10000 cache.specs.alarmTypes.maxSize=10000
cache.specs.userSettings.timeToLiveInMinutes=1440
cache.specs.userSettings.maxSize=10000
cache.specs.mobileAppSettings.timeToLiveInMinutes=1440
cache.specs.mobileAppSettings.maxSize=10000
cache.specs.mobileSecretKey.timeToLiveInMinutes=1440
cache.specs.mobileSecretKey.maxSize=10000
redis.connection.host=localhost redis.connection.host=localhost
redis.connection.port=6379 redis.connection.port=6379
redis.connection.db=0 redis.connection.db=0