Merge pull request #11612 from thingsboard/fix/versioned-cache-refactoring
Refactor versioned caching
This commit is contained in:
commit
6e79672b97
@ -54,11 +54,6 @@ public abstract class CaffeineTbTransactionalCache<K extends Serializable, V ext
|
||||
return SimpleTbCacheValueWrapper.wrap(cache.get(key));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbCacheValueWrapper<V> get(K key, boolean transactionMode) {
|
||||
return get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(K key, V value) {
|
||||
lock.lock();
|
||||
|
||||
@ -31,7 +31,7 @@ public class RedisTbCacheTransaction<K extends Serializable, V extends Serializa
|
||||
|
||||
@Override
|
||||
public void put(K key, V value) {
|
||||
cache.put(key, value, connection, true);
|
||||
cache.put(key, value, connection);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -87,17 +87,11 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
|
||||
|
||||
@Override
|
||||
public TbCacheValueWrapper<V> get(K key) {
|
||||
return get(key, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbCacheValueWrapper<V> get(K key, boolean transactionMode) {
|
||||
if (!cacheEnabled) {
|
||||
return null;
|
||||
}
|
||||
try (var connection = connectionFactory.getConnection()) {
|
||||
byte[] rawKey = getRawKey(key);
|
||||
byte[] rawValue = doGet(connection, rawKey, transactionMode);
|
||||
byte[] rawValue = doGet(key, connection);
|
||||
if (rawValue == null || rawValue.length == 0) {
|
||||
return null;
|
||||
} else if (Arrays.equals(rawValue, BINARY_NULL_VALUE)) {
|
||||
@ -114,8 +108,8 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
|
||||
}
|
||||
}
|
||||
|
||||
protected byte[] doGet(RedisConnection connection, byte[] rawKey, boolean transactionMode) {
|
||||
return connection.stringCommands().get(rawKey);
|
||||
protected byte[] doGet(K key, RedisConnection connection) {
|
||||
return connection.stringCommands().get(getRawKey(key));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -124,11 +118,11 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
|
||||
return;
|
||||
}
|
||||
try (var connection = connectionFactory.getConnection()) {
|
||||
put(key, value, connection, false);
|
||||
put(key, value, connection);
|
||||
}
|
||||
}
|
||||
|
||||
public void put(K key, V value, RedisConnection connection, boolean transactionMode) {
|
||||
public void put(K key, V value, RedisConnection connection) {
|
||||
put(connection, key, value, RedisStringCommands.SetOption.UPSERT);
|
||||
}
|
||||
|
||||
|
||||
@ -27,8 +27,6 @@ public interface TbTransactionalCache<K extends Serializable, V extends Serializ
|
||||
|
||||
TbCacheValueWrapper<V> get(K key);
|
||||
|
||||
TbCacheValueWrapper<V> get(K key, boolean transactionMode);
|
||||
|
||||
void put(K key, V value);
|
||||
|
||||
void putIfAbsent(K key, V value);
|
||||
@ -53,7 +51,7 @@ public interface TbTransactionalCache<K extends Serializable, V extends Serializ
|
||||
if (putToCache) {
|
||||
return getAndPutInTransaction(key, dbCall, cacheNullValue);
|
||||
} else {
|
||||
TbCacheValueWrapper<V> cacheValueWrapper = get(key, true);
|
||||
TbCacheValueWrapper<V> cacheValueWrapper = get(key);
|
||||
if (cacheValueWrapper != null) {
|
||||
return cacheValueWrapper.get();
|
||||
}
|
||||
@ -66,7 +64,7 @@ public interface TbTransactionalCache<K extends Serializable, V extends Serializ
|
||||
}
|
||||
|
||||
default <R> R getAndPutInTransaction(K key, Supplier<R> dbCall, Function<V, R> cacheValueToResult, Function<R, V> dbValueToCacheValue, boolean cacheNullValue) {
|
||||
TbCacheValueWrapper<V> cacheValueWrapper = get(key, true);
|
||||
TbCacheValueWrapper<V> cacheValueWrapper = get(key);
|
||||
if (cacheValueWrapper != null) {
|
||||
V cacheValue = cacheValueWrapper.get();
|
||||
return cacheValue != null ? cacheValueToResult.apply(cacheValue) : null;
|
||||
@ -92,7 +90,7 @@ public interface TbTransactionalCache<K extends Serializable, V extends Serializ
|
||||
if (putToCache) {
|
||||
return getAndPutInTransaction(key, dbCall, cacheValueToResult, dbValueToCacheValue, cacheNullValue);
|
||||
} else {
|
||||
TbCacheValueWrapper<V> cacheValueWrapper = get(key, true);
|
||||
TbCacheValueWrapper<V> cacheValueWrapper = get(key);
|
||||
if (cacheValueWrapper != null) {
|
||||
var cacheValue = cacheValueWrapper.get();
|
||||
return cacheValue == null ? null : cacheValueToResult.apply(cacheValue);
|
||||
|
||||
26
common/cache/src/main/java/org/thingsboard/server/cache/VersionedCacheKey.java
vendored
Normal file
26
common/cache/src/main/java/org/thingsboard/server/cache/VersionedCacheKey.java
vendored
Normal file
@ -0,0 +1,26 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.cache;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public interface VersionedCacheKey extends Serializable {
|
||||
|
||||
default boolean isVersioned() {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
@ -22,7 +22,7 @@ import org.thingsboard.server.common.data.util.TbPair;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends Serializable & HasVersion> extends CaffeineTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
|
||||
public abstract class VersionedCaffeineTbCache<K extends VersionedCacheKey, V extends Serializable & HasVersion> extends CaffeineTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
|
||||
|
||||
public VersionedCaffeineTbCache(CacheManager cacheManager, String cacheName) {
|
||||
super(cacheManager, cacheName);
|
||||
|
||||
@ -30,7 +30,7 @@ import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
|
||||
@Slf4j
|
||||
public abstract class VersionedRedisTbCache<K extends Serializable, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
|
||||
public abstract class VersionedRedisTbCache<K extends VersionedCacheKey, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
|
||||
|
||||
private static final int VERSION_SIZE = 8;
|
||||
private static final int VALUE_END_OFFSET = -1;
|
||||
@ -79,15 +79,20 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
|
||||
}
|
||||
|
||||
@Override
|
||||
protected byte[] doGet(RedisConnection connection, byte[] rawKey, boolean transactionMode) {
|
||||
if (transactionMode) {
|
||||
return super.doGet(connection, rawKey, true);
|
||||
protected byte[] doGet(K key, RedisConnection connection) {
|
||||
if (!key.isVersioned()) {
|
||||
return super.doGet(key, connection);
|
||||
}
|
||||
byte[] rawKey = getRawKey(key);
|
||||
return connection.stringCommands().getRange(rawKey, VERSION_SIZE, VALUE_END_OFFSET);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(K key, V value) {
|
||||
if (!key.isVersioned()) {
|
||||
super.put(key, value);
|
||||
return;
|
||||
}
|
||||
Long version = getVersion(value);
|
||||
if (version == null) {
|
||||
return;
|
||||
@ -96,9 +101,9 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(K key, V value, RedisConnection connection, boolean transactionMode) {
|
||||
if (transactionMode) {
|
||||
super.put(key, value, connection, true); // because scripting commands are not supported in transaction mode
|
||||
public void put(K key, V value, RedisConnection connection) {
|
||||
if (!key.isVersioned()) {
|
||||
super.put(key, value, connection); // because scripting commands are not supported in transaction mode
|
||||
return;
|
||||
}
|
||||
Long version = getVersion(value);
|
||||
|
||||
@ -22,7 +22,7 @@ import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public interface VersionedTbCache<K extends Serializable, V extends Serializable & HasVersion> extends TbTransactionalCache<K, V> {
|
||||
public interface VersionedTbCache<K extends VersionedCacheKey, V extends Serializable & HasVersion> extends TbTransactionalCache<K, V> {
|
||||
|
||||
TbCacheValueWrapper<V> get(K key);
|
||||
|
||||
|
||||
@ -19,17 +19,17 @@ import lombok.Builder;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.thingsboard.server.cache.VersionedCacheKey;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
|
||||
@Getter
|
||||
@EqualsAndHashCode
|
||||
@RequiredArgsConstructor
|
||||
@Builder
|
||||
public class DeviceCacheKey implements Serializable {
|
||||
public class DeviceCacheKey implements VersionedCacheKey {
|
||||
|
||||
@Serial
|
||||
private static final long serialVersionUID = 6366389552842340207L;
|
||||
@ -61,4 +61,9 @@ public class DeviceCacheKey implements Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isVersioned() {
|
||||
return deviceId != null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -16,14 +16,14 @@
|
||||
package org.thingsboard.server.dao.asset;
|
||||
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.cache.VersionedCacheKey;
|
||||
import org.thingsboard.server.common.data.id.AssetProfileId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
|
||||
@Data
|
||||
public class AssetProfileCacheKey implements Serializable {
|
||||
public class AssetProfileCacheKey implements VersionedCacheKey {
|
||||
|
||||
@Serial
|
||||
private static final long serialVersionUID = 8220455917177676472L;
|
||||
@ -63,4 +63,9 @@ public class AssetProfileCacheKey implements Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isVersioned() {
|
||||
return assetProfileId != null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -18,16 +18,16 @@ package org.thingsboard.server.dao.attributes;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import org.thingsboard.server.cache.VersionedCacheKey;
|
||||
import org.thingsboard.server.common.data.AttributeScope;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public class AttributeCacheKey implements Serializable {
|
||||
public class AttributeCacheKey implements VersionedCacheKey {
|
||||
|
||||
@Serial
|
||||
private static final long serialVersionUID = 2013369077925351881L;
|
||||
@ -41,4 +41,9 @@ public class AttributeCacheKey implements Serializable {
|
||||
return "{" + entityId + "}" + scope + "_" + key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isVersioned() {
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -16,15 +16,15 @@
|
||||
package org.thingsboard.server.dao.device;
|
||||
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.cache.VersionedCacheKey;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
|
||||
@Data
|
||||
public class DeviceProfileCacheKey implements Serializable {
|
||||
public class DeviceProfileCacheKey implements VersionedCacheKey {
|
||||
|
||||
@Serial
|
||||
private static final long serialVersionUID = 8220455917177676472L;
|
||||
@ -74,4 +74,9 @@ public class DeviceProfileCacheKey implements Serializable {
|
||||
return tenantId + "_" + name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isVersioned() {
|
||||
return deviceProfileId != null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -16,12 +16,13 @@
|
||||
package org.thingsboard.server.dao.entity;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.thingsboard.server.cache.VersionedCacheKey;
|
||||
import org.thingsboard.server.cache.VersionedTbCache;
|
||||
import org.thingsboard.server.common.data.HasVersion;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public abstract class CachedVersionedEntityService<K extends Serializable, V extends Serializable & HasVersion, E> extends AbstractCachedEntityService<K, V, E> {
|
||||
public abstract class CachedVersionedEntityService<K extends VersionedCacheKey, V extends Serializable & HasVersion, E> extends AbstractCachedEntityService<K, V, E> {
|
||||
|
||||
@Autowired
|
||||
protected VersionedTbCache<K, V> cache;
|
||||
|
||||
@ -18,17 +18,17 @@ package org.thingsboard.server.dao.entityview;
|
||||
import lombok.Builder;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import org.thingsboard.server.cache.VersionedCacheKey;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.EntityViewId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
|
||||
@Getter
|
||||
@EqualsAndHashCode
|
||||
@Builder
|
||||
public class EntityViewCacheKey implements Serializable {
|
||||
public class EntityViewCacheKey implements VersionedCacheKey {
|
||||
|
||||
@Serial
|
||||
private static final long serialVersionUID = 5986277528222738163L;
|
||||
@ -68,4 +68,9 @@ public class EntityViewCacheKey implements Serializable {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isVersioned() {
|
||||
return entityViewId != null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -18,15 +18,15 @@ package org.thingsboard.server.dao.timeseries;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import org.thingsboard.server.cache.VersionedCacheKey;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
|
||||
@EqualsAndHashCode
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public class TsLatestCacheKey implements Serializable {
|
||||
public class TsLatestCacheKey implements VersionedCacheKey {
|
||||
|
||||
@Serial
|
||||
private static final long serialVersionUID = 2024369077925351881L;
|
||||
@ -39,4 +39,9 @@ public class TsLatestCacheKey implements Serializable {
|
||||
return "{" + entityId + "}" + key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isVersioned() {
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user