Merge remote-tracking branch 'origin/feature/attr_tskv_version' into feature/entities-version
This commit is contained in:
commit
4302b63fd9
@ -38,7 +38,7 @@ public class CaffeineTbCacheTransaction<K extends Serializable, V extends Serial
|
|||||||
@Setter
|
@Setter
|
||||||
private boolean failed;
|
private boolean failed;
|
||||||
|
|
||||||
private final Map<Object, Object> pendingPuts = new LinkedHashMap<>();
|
private final Map<K, V> pendingPuts = new LinkedHashMap<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void put(K key, V value) {
|
public void put(K key, V value) {
|
||||||
|
|||||||
@ -116,7 +116,7 @@ public abstract class CaffeineTbTransactionalCache<K extends Serializable, V ext
|
|||||||
return newTransaction(keys);
|
return newTransaction(keys);
|
||||||
}
|
}
|
||||||
|
|
||||||
void doPutIfAbsent(Object key, Object value) {
|
void doPutIfAbsent(K key, V value) {
|
||||||
cache.putIfAbsent(key, value);
|
cache.putIfAbsent(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -139,7 +139,7 @@ public abstract class CaffeineTbTransactionalCache<K extends Serializable, V ext
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean commit(UUID trId, Map<Object, Object> pendingPuts) {
|
public boolean commit(UUID trId, Map<K, V> pendingPuts) {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
var tr = transactions.get(trId);
|
var tr = transactions.get(trId);
|
||||||
|
|||||||
@ -53,7 +53,7 @@ public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends
|
|||||||
TbPair<Long, V> versionValuePair = doGet(key);
|
TbPair<Long, V> versionValuePair = doGet(key);
|
||||||
if (versionValuePair == null || version > versionValuePair.getFirst()) {
|
if (versionValuePair == null || version > versionValuePair.getFirst()) {
|
||||||
failAllTransactionsByKey(key);
|
failAllTransactionsByKey(key);
|
||||||
cache.put(key, TbPair.of(version, value));
|
cache.put(key, wrapValue(value, version));
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
@ -84,4 +84,13 @@ public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends
|
|||||||
put(key, null, version);
|
put(key, null, version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void doPutIfAbsent(K key, V value) {
|
||||||
|
cache.putIfAbsent(key, wrapValue(value, value != null ? value.getVersion() : 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
private TbPair<Long, V> wrapValue(V value, Long version) {
|
||||||
|
return TbPair.of(version, value);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -28,7 +28,6 @@ import org.thingsboard.server.common.data.HasVersion;
|
|||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class VersionedRedisTbCache<K extends Serializable, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
|
public abstract class VersionedRedisTbCache<K extends Serializable, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
|
||||||
@ -156,11 +155,6 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
|
|||||||
throw new NotImplementedException("putIfAbsent is not supported by versioned cache");
|
throw new NotImplementedException("putIfAbsent is not supported by versioned cache");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void evict(Collection<K> keys) {
|
|
||||||
throw new NotImplementedException("evict by many keys is not supported by versioned cache");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void evictOrPut(K key, V value) {
|
public void evictOrPut(K key, V value) {
|
||||||
throw new NotImplementedException("evictOrPut is not supported by versioned cache");
|
throw new NotImplementedException("evictOrPut is not supported by versioned cache");
|
||||||
|
|||||||
@ -18,17 +18,32 @@ package org.thingsboard.server.cache;
|
|||||||
import org.thingsboard.server.common.data.HasVersion;
|
import org.thingsboard.server.common.data.HasVersion;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
public interface VersionedTbCache<K extends Serializable, V extends Serializable & HasVersion> extends TbTransactionalCache<K, V> {
|
public interface VersionedTbCache<K extends Serializable, V extends Serializable & HasVersion> extends TbTransactionalCache<K, V> {
|
||||||
|
|
||||||
TbCacheValueWrapper<V> get(K key);
|
TbCacheValueWrapper<V> get(K key);
|
||||||
|
|
||||||
|
default V get(K key, Supplier<V> supplier) {
|
||||||
|
return Optional.ofNullable(get(key))
|
||||||
|
.map(TbCacheValueWrapper::get)
|
||||||
|
.orElseGet(() -> {
|
||||||
|
V value = supplier.get();
|
||||||
|
put(key, value);
|
||||||
|
return value;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
void put(K key, V value);
|
void put(K key, V value);
|
||||||
|
|
||||||
void put(K key, V value, Long version);
|
void put(K key, V value, Long version);
|
||||||
|
|
||||||
void evict(K key);
|
void evict(K key);
|
||||||
|
|
||||||
|
void evict(Collection<K> keys);
|
||||||
|
|
||||||
void evict(K key, Long version);
|
void evict(K key, Long version);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user