Fix Caffeine's getAndPutInTransaction; add compute method to VersionedTbCache
This commit is contained in:
		
							parent
							
								
									cf821515fb
								
							
						
					
					
						commit
						b894cbc5d2
					
				@ -38,7 +38,7 @@ public class CaffeineTbCacheTransaction<K extends Serializable, V extends Serial
 | 
			
		||||
    @Setter
 | 
			
		||||
    private boolean failed;
 | 
			
		||||
 | 
			
		||||
    private final Map<Object, Object> pendingPuts = new LinkedHashMap<>();
 | 
			
		||||
    private final Map<K, V> pendingPuts = new LinkedHashMap<>();
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void put(K key, V value) {
 | 
			
		||||
 | 
			
		||||
@ -116,7 +116,7 @@ public abstract class CaffeineTbTransactionalCache<K extends Serializable, V ext
 | 
			
		||||
        return newTransaction(keys);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void doPutIfAbsent(Object key, Object value) {
 | 
			
		||||
    void doPutIfAbsent(K key, V value) {
 | 
			
		||||
        cache.putIfAbsent(key, value);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -139,7 +139,7 @@ public abstract class CaffeineTbTransactionalCache<K extends Serializable, V ext
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public boolean commit(UUID trId, Map<Object, Object> pendingPuts) {
 | 
			
		||||
    public boolean commit(UUID trId, Map<K, V> pendingPuts) {
 | 
			
		||||
        lock.lock();
 | 
			
		||||
        try {
 | 
			
		||||
            var tr = transactions.get(trId);
 | 
			
		||||
 | 
			
		||||
@ -53,7 +53,7 @@ public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends
 | 
			
		||||
            TbPair<Long, V> versionValuePair = doGet(key);
 | 
			
		||||
            if (versionValuePair == null || version > versionValuePair.getFirst()) {
 | 
			
		||||
                failAllTransactionsByKey(key);
 | 
			
		||||
                cache.put(key, TbPair.of(version, value));
 | 
			
		||||
                cache.put(key, wrapValue(value, version));
 | 
			
		||||
            }
 | 
			
		||||
        } finally {
 | 
			
		||||
            lock.unlock();
 | 
			
		||||
@ -84,4 +84,13 @@ public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends
 | 
			
		||||
        put(key, null, version);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    void doPutIfAbsent(K key, V value) {
 | 
			
		||||
        cache.putIfAbsent(key, wrapValue(value, value != null ? value.getVersion() : 0));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TbPair<Long, V> wrapValue(V value, Long version) {
 | 
			
		||||
        return TbPair.of(version, value);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -28,7 +28,6 @@ import org.thingsboard.server.common.data.HasVersion;
 | 
			
		||||
 | 
			
		||||
import java.io.Serializable;
 | 
			
		||||
import java.util.Arrays;
 | 
			
		||||
import java.util.Collection;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public abstract class VersionedRedisTbCache<K extends Serializable, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
 | 
			
		||||
@ -156,11 +155,6 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
 | 
			
		||||
        throw new NotImplementedException("putIfAbsent is not supported by versioned cache");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void evict(Collection<K> keys) {
 | 
			
		||||
        throw new NotImplementedException("evict by many keys is not supported by versioned cache");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void evictOrPut(K key, V value) {
 | 
			
		||||
        throw new NotImplementedException("evictOrPut is not supported by versioned cache");
 | 
			
		||||
 | 
			
		||||
@ -18,17 +18,32 @@ package org.thingsboard.server.cache;
 | 
			
		||||
import org.thingsboard.server.common.data.HasVersion;
 | 
			
		||||
 | 
			
		||||
import java.io.Serializable;
 | 
			
		||||
import java.util.Collection;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.function.Supplier;
 | 
			
		||||
 | 
			
		||||
public interface VersionedTbCache<K extends Serializable, V extends Serializable & HasVersion> extends TbTransactionalCache<K, V> {
 | 
			
		||||
 | 
			
		||||
    TbCacheValueWrapper<V> get(K key);
 | 
			
		||||
 | 
			
		||||
    default V get(K key, Supplier<V> supplier) {
 | 
			
		||||
        return Optional.ofNullable(get(key))
 | 
			
		||||
                .map(TbCacheValueWrapper::get)
 | 
			
		||||
                .orElseGet(() -> {
 | 
			
		||||
                    V value = supplier.get();
 | 
			
		||||
                    put(key, value);
 | 
			
		||||
                    return value;
 | 
			
		||||
                });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void put(K key, V value);
 | 
			
		||||
 | 
			
		||||
    void put(K key, V value, Long version);
 | 
			
		||||
 | 
			
		||||
    void evict(K key);
 | 
			
		||||
 | 
			
		||||
    void evict(Collection<K> keys);
 | 
			
		||||
 | 
			
		||||
    void evict(K key, Long version);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user