Refactor versioned caching

This commit is contained in:
ViacheslavKlimov 2024-09-10 13:42:27 +03:00
parent a87a6cb118
commit 53a975681c
15 changed files with 93 additions and 44 deletions

View File

@ -0,0 +1,26 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.cache;
import java.io.Serializable;
public interface CacheKey extends Serializable {
default boolean isVersioned() {
return false;
}
}

View File

@ -54,11 +54,6 @@ public abstract class CaffeineTbTransactionalCache<K extends Serializable, V ext
return SimpleTbCacheValueWrapper.wrap(cache.get(key));
}
@Override
public TbCacheValueWrapper<V> get(K key, boolean transactionMode) {
return get(key);
}
@Override
public void put(K key, V value) {
lock.lock();

View File

@ -31,7 +31,7 @@ public class RedisTbCacheTransaction<K extends Serializable, V extends Serializa
@Override
public void put(K key, V value) {
cache.put(key, value, connection, true);
cache.put(key, value, connection);
}
@Override

View File

@ -87,17 +87,11 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
@Override
public TbCacheValueWrapper<V> get(K key) {
return get(key, false);
}
@Override
public TbCacheValueWrapper<V> get(K key, boolean transactionMode) {
if (!cacheEnabled) {
return null;
}
try (var connection = connectionFactory.getConnection()) {
byte[] rawKey = getRawKey(key);
byte[] rawValue = doGet(connection, rawKey, transactionMode);
byte[] rawValue = doGet(key, connection);
if (rawValue == null || rawValue.length == 0) {
return null;
} else if (Arrays.equals(rawValue, BINARY_NULL_VALUE)) {
@ -114,8 +108,8 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
}
}
protected byte[] doGet(RedisConnection connection, byte[] rawKey, boolean transactionMode) {
return connection.stringCommands().get(rawKey);
protected byte[] doGet(K key, RedisConnection connection) {
return connection.stringCommands().get(getRawKey(key));
}
@Override
@ -124,11 +118,11 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
return;
}
try (var connection = connectionFactory.getConnection()) {
put(key, value, connection, false);
put(key, value, connection);
}
}
public void put(K key, V value, RedisConnection connection, boolean transactionMode) {
public void put(K key, V value, RedisConnection connection) {
put(connection, key, value, RedisStringCommands.SetOption.UPSERT);
}

View File

@ -27,8 +27,6 @@ public interface TbTransactionalCache<K extends Serializable, V extends Serializ
TbCacheValueWrapper<V> get(K key);
TbCacheValueWrapper<V> get(K key, boolean transactionMode);
void put(K key, V value);
void putIfAbsent(K key, V value);
@ -53,7 +51,7 @@ public interface TbTransactionalCache<K extends Serializable, V extends Serializ
if (putToCache) {
return getAndPutInTransaction(key, dbCall, cacheNullValue);
} else {
TbCacheValueWrapper<V> cacheValueWrapper = get(key, true);
TbCacheValueWrapper<V> cacheValueWrapper = get(key);
if (cacheValueWrapper != null) {
return cacheValueWrapper.get();
}
@ -66,7 +64,7 @@ public interface TbTransactionalCache<K extends Serializable, V extends Serializ
}
default <R> R getAndPutInTransaction(K key, Supplier<R> dbCall, Function<V, R> cacheValueToResult, Function<R, V> dbValueToCacheValue, boolean cacheNullValue) {
TbCacheValueWrapper<V> cacheValueWrapper = get(key, true);
TbCacheValueWrapper<V> cacheValueWrapper = get(key);
if (cacheValueWrapper != null) {
V cacheValue = cacheValueWrapper.get();
return cacheValue != null ? cacheValueToResult.apply(cacheValue) : null;
@ -92,7 +90,7 @@ public interface TbTransactionalCache<K extends Serializable, V extends Serializ
if (putToCache) {
return getAndPutInTransaction(key, dbCall, cacheValueToResult, dbValueToCacheValue, cacheNullValue);
} else {
TbCacheValueWrapper<V> cacheValueWrapper = get(key, true);
TbCacheValueWrapper<V> cacheValueWrapper = get(key);
if (cacheValueWrapper != null) {
var cacheValue = cacheValueWrapper.get();
return cacheValue == null ? null : cacheValueToResult.apply(cacheValue);

View File

@ -22,7 +22,7 @@ import org.thingsboard.server.common.data.util.TbPair;
import java.io.Serializable;
public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends Serializable & HasVersion> extends CaffeineTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
public abstract class VersionedCaffeineTbCache<K extends CacheKey, V extends Serializable & HasVersion> extends CaffeineTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
public VersionedCaffeineTbCache(CacheManager cacheManager, String cacheName) {
super(cacheManager, cacheName);

View File

@ -30,7 +30,7 @@ import java.io.Serializable;
import java.util.Arrays;
@Slf4j
public abstract class VersionedRedisTbCache<K extends Serializable, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
public abstract class VersionedRedisTbCache<K extends CacheKey, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
private static final int VERSION_SIZE = 8;
private static final int VALUE_END_OFFSET = -1;
@ -79,15 +79,20 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
}
@Override
protected byte[] doGet(RedisConnection connection, byte[] rawKey, boolean transactionMode) {
if (transactionMode) {
return super.doGet(connection, rawKey, true);
protected byte[] doGet(K key, RedisConnection connection) {
if (!key.isVersioned()) {
return super.doGet(key, connection);
}
byte[] rawKey = getRawKey(key);
return connection.stringCommands().getRange(rawKey, VERSION_SIZE, VALUE_END_OFFSET);
}
@Override
public void put(K key, V value) {
if (!key.isVersioned()) {
super.put(key, value);
return;
}
Long version = getVersion(value);
if (version == null) {
return;
@ -96,9 +101,9 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
}
@Override
public void put(K key, V value, RedisConnection connection, boolean transactionMode) {
if (transactionMode) {
super.put(key, value, connection, true); // because scripting commands are not supported in transaction mode
public void put(K key, V value, RedisConnection connection) {
if (!key.isVersioned()) {
super.put(key, value, connection); // because scripting commands are not supported in transaction mode
return;
}
Long version = getVersion(value);

View File

@ -22,7 +22,7 @@ import java.util.Collection;
import java.util.Optional;
import java.util.function.Supplier;
public interface VersionedTbCache<K extends Serializable, V extends Serializable & HasVersion> extends TbTransactionalCache<K, V> {
public interface VersionedTbCache<K extends CacheKey, V extends Serializable & HasVersion> extends TbTransactionalCache<K, V> {
TbCacheValueWrapper<V> get(K key);

View File

@ -19,17 +19,17 @@ import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.thingsboard.server.cache.CacheKey;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import java.io.Serial;
import java.io.Serializable;
@Getter
@EqualsAndHashCode
@RequiredArgsConstructor
@Builder
public class DeviceCacheKey implements Serializable {
public class DeviceCacheKey implements CacheKey {
@Serial
private static final long serialVersionUID = 6366389552842340207L;
@ -61,4 +61,9 @@ public class DeviceCacheKey implements Serializable {
}
}
@Override
public boolean isVersioned() {
return deviceId != null;
}
}

View File

@ -16,14 +16,14 @@
package org.thingsboard.server.dao.asset;
import lombok.Data;
import org.thingsboard.server.cache.CacheKey;
import org.thingsboard.server.common.data.id.AssetProfileId;
import org.thingsboard.server.common.data.id.TenantId;
import java.io.Serial;
import java.io.Serializable;
@Data
public class AssetProfileCacheKey implements Serializable {
public class AssetProfileCacheKey implements CacheKey {
@Serial
private static final long serialVersionUID = 8220455917177676472L;
@ -63,4 +63,9 @@ public class AssetProfileCacheKey implements Serializable {
}
}
@Override
public boolean isVersioned() {
return assetProfileId != null;
}
}

View File

@ -18,16 +18,16 @@ package org.thingsboard.server.dao.attributes;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.thingsboard.server.cache.CacheKey;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.EntityId;
import java.io.Serial;
import java.io.Serializable;
@EqualsAndHashCode
@Getter
@AllArgsConstructor
public class AttributeCacheKey implements Serializable {
public class AttributeCacheKey implements CacheKey {
@Serial
private static final long serialVersionUID = 2013369077925351881L;
@ -41,4 +41,9 @@ public class AttributeCacheKey implements Serializable {
return "{" + entityId + "}" + scope + "_" + key;
}
@Override
public boolean isVersioned() {
return true;
}
}

View File

@ -16,15 +16,15 @@
package org.thingsboard.server.dao.device;
import lombok.Data;
import org.thingsboard.server.cache.CacheKey;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantId;
import java.io.Serial;
import java.io.Serializable;
@Data
public class DeviceProfileCacheKey implements Serializable {
public class DeviceProfileCacheKey implements CacheKey {
@Serial
private static final long serialVersionUID = 8220455917177676472L;
@ -74,4 +74,9 @@ public class DeviceProfileCacheKey implements Serializable {
return tenantId + "_" + name;
}
@Override
public boolean isVersioned() {
return deviceProfileId != null;
}
}

View File

@ -16,12 +16,13 @@
package org.thingsboard.server.dao.entity;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.cache.CacheKey;
import org.thingsboard.server.cache.VersionedTbCache;
import org.thingsboard.server.common.data.HasVersion;
import java.io.Serializable;
public abstract class CachedVersionedEntityService<K extends Serializable, V extends Serializable & HasVersion, E> extends AbstractCachedEntityService<K, V, E> {
public abstract class CachedVersionedEntityService<K extends CacheKey, V extends Serializable & HasVersion, E> extends AbstractCachedEntityService<K, V, E> {
@Autowired
protected VersionedTbCache<K, V> cache;

View File

@ -18,17 +18,17 @@ package org.thingsboard.server.dao.entityview;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.thingsboard.server.cache.CacheKey;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import java.io.Serial;
import java.io.Serializable;
@Getter
@EqualsAndHashCode
@Builder
public class EntityViewCacheKey implements Serializable {
public class EntityViewCacheKey implements CacheKey {
@Serial
private static final long serialVersionUID = 5986277528222738163L;
@ -68,4 +68,9 @@ public class EntityViewCacheKey implements Serializable {
}
}
@Override
public boolean isVersioned() {
return entityViewId != null;
}
}

View File

@ -18,15 +18,15 @@ package org.thingsboard.server.dao.timeseries;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.thingsboard.server.cache.CacheKey;
import org.thingsboard.server.common.data.id.EntityId;
import java.io.Serial;
import java.io.Serializable;
@EqualsAndHashCode
@Getter
@AllArgsConstructor
public class TsLatestCacheKey implements Serializable {
public class TsLatestCacheKey implements CacheKey {
@Serial
private static final long serialVersionUID = 2024369077925351881L;
@ -39,4 +39,9 @@ public class TsLatestCacheKey implements Serializable {
return "{" + entityId + "}" + key;
}
@Override
public boolean isVersioned() {
return true;
}
}