Redis Cache implementation

This commit is contained in:
Andrii Shvaika 2022-05-06 19:12:41 +03:00
parent 7eaa70e472
commit 9f3f2985ab
23 changed files with 401 additions and 142 deletions

View File

@ -387,8 +387,6 @@ cache:
attributes:
# make sure that if cache.type is 'redis' and cache.attributes.enabled is 'true' that you change 'maxmemory-policy' Redis config property to 'allkeys-lru', 'allkeys-lfu' or 'allkeys-random'
enabled: "${CACHE_ATTRIBUTES_ENABLED:true}"
caffeine:
specs:
relations:
timeToLiveInMinutes: "${CACHE_SPECS_RELATIONS_TTL:1440}"
@ -475,6 +473,8 @@ redis:
maxWaitMills: "${REDIS_POOL_CONFIG_MAX_WAIT_MS:60000}"
numberTestsPerEvictionRun: "${REDIS_POOL_CONFIG_NUMBER_TESTS_PER_EVICTION_RUN:3}"
blockWhenExhausted: "${REDIS_POOL_CONFIG_BLOCK_WHEN_EXHAUSTED:true}"
# TTL for short-living SET commands that are used to replace DEL in order to enable transaction support
evictTtlInMs: "${REDIS_EVICT_TTL_MS:60000}"
# Check new version updates parameters
updates:

View File

@ -36,15 +36,15 @@ import static org.assertj.core.api.Assertions.assertThat;
public class CaffeineCacheDefaultConfigurationTest {
@Autowired
CaffeineCacheConfiguration caffeineCacheConfiguration;
CacheSpecsMap cacheSpecsMap;
@Test
public void verifyTransactionAwareCacheManagerProxy() {
assertThat(caffeineCacheConfiguration.getSpecs()).as("specs").isNotNull();
caffeineCacheConfiguration.getSpecs().forEach((name, cacheSpecs)->assertThat(cacheSpecs).as("cache %s specs", name).isNotNull());
assertThat(cacheSpecsMap.getSpecs()).as("specs").isNotNull();
cacheSpecsMap.getSpecs().forEach((name, cacheSpecs)->assertThat(cacheSpecs).as("cache %s specs", name).isNotNull());
SoftAssertions softly = new SoftAssertions();
caffeineCacheConfiguration.getSpecs().forEach((name, cacheSpecs)->{
cacheSpecsMap.getSpecs().forEach((name, cacheSpecs)->{
softly.assertThat(name).as("cache name").isNotEmpty();
softly.assertThat(cacheSpecs.getTimeToLiveInMinutes()).as("cache %s time to live", name).isGreaterThan(0);
softly.assertThat(cacheSpecs.getMaxSize()).as("cache %s max size", name).isGreaterThan(0);

View File

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.cache;
import lombok.Data;
import lombok.Getter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
@Configuration
@ConfigurationProperties(prefix = "cache")
@Data
public class CacheSpecsMap {
@Getter
private Map<String, CacheSpecs> specs;
}

View File

@ -16,6 +16,8 @@
package org.thingsboard.server.cache;
import lombok.Data;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
@ -41,6 +43,9 @@ import java.time.Duration;
@Data
public abstract class TBRedisCacheConfiguration {
@Value("${redis.evictTtlInMs:60000}")
private int evictTtlInMs;
@Value("${redis.pool_config.maxTotal:128}")
private int maxTotal;

View File

@ -29,7 +29,7 @@ import java.util.Collections;
import java.util.List;
@Configuration
@ConditionalOnMissingBean(CaffeineCacheConfiguration.class)
@ConditionalOnMissingBean(TbCaffeineCacheConfiguration.class)
@ConditionalOnProperty(prefix = "redis.connection", value = "type", havingValue = "cluster")
public class TBRedisClusterConfiguration extends TBRedisCacheConfiguration {

View File

@ -26,7 +26,7 @@ import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import java.time.Duration;
@Configuration
@ConditionalOnMissingBean(CaffeineCacheConfiguration.class)
@ConditionalOnMissingBean(TbCaffeineCacheConfiguration.class)
@ConditionalOnProperty(prefix = "redis.connection", value = "type", havingValue = "standalone")
public class TBRedisStandaloneConfiguration extends TBRedisCacheConfiguration {

View File

@ -15,10 +15,6 @@
*/
package org.thingsboard.server.cache;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.Executor;
public interface TbCacheTransaction<K, V> {
void putIfAbsent(K key, V value);
@ -27,5 +23,4 @@ public interface TbCacheTransaction<K, V> {
void rollback();
<T> void rollBackOnFailure(ListenableFuture<T> result, Executor cacheExecutor);
}

View File

@ -19,12 +19,9 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Ticker;
import com.github.benmanes.caffeine.cache.Weigher;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.caffeine.CaffeineCache;
@ -36,23 +33,20 @@ import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Configuration
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true)
@ConfigurationProperties(prefix = "caffeine")
@EnableCaching
@Data
@Slf4j
public class CaffeineCacheConfiguration {
public class TbCaffeineCacheConfiguration {
@Value("${cache.type}")
private String test;
private Map<String, CacheSpecs> specs;
private final CacheSpecsMap configuration;
public TbCaffeineCacheConfiguration(CacheSpecsMap configuration) {
this.configuration = configuration;
}
/**
* Transaction aware CaffeineCache implementation with TransactionAwareCacheManagerProxy
@ -60,11 +54,11 @@ public class CaffeineCacheConfiguration {
*/
@Bean
public CacheManager cacheManager() {
log.trace("Initializing cache: {} specs {}", Arrays.toString(RemovalCause.values()), specs);
log.trace("Initializing cache: {} specs {}", Arrays.toString(RemovalCause.values()), configuration.getSpecs());
SimpleCacheManager manager = new SimpleCacheManager();
if (specs != null) {
if (configuration.getSpecs() != null) {
List<CaffeineCache> caches =
specs.entrySet().stream()
configuration.getSpecs().entrySet().stream()
.map(entry -> buildCache(entry.getKey(),
entry.getValue()))
.collect(Collectors.toList());
@ -100,4 +94,5 @@ public class CaffeineCacheConfiguration {
return 1;
};
}
}

View File

@ -28,6 +28,8 @@ public interface TbTransactionalCache<K extends Serializable, V extends Serializ
void evict(K key);
void evictOrPut(K key, V value);
TbCacheTransaction<K, V> newTransactionForKey(K key);
TbCacheTransaction<K, V> newTransactionForKeys(List<K> keys);

View File

@ -30,16 +30,16 @@ import org.springframework.test.context.junit.jupiter.SpringExtension;
import static org.assertj.core.api.Assertions.assertThat;
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = CaffeineCacheConfiguration.class)
@ContextConfiguration(classes = {CacheSpecsMap.class, TbCaffeineCacheConfiguration.class})
@EnableConfigurationProperties
@TestPropertySource(properties = {
"cache.type=caffeine",
"caffeine.specs.relations.timeToLiveInMinutes=1440",
"caffeine.specs.relations.maxSize=0",
"caffeine.specs.devices.timeToLiveInMinutes=60",
"caffeine.specs.devices.maxSize=100"})
"cache.specs.relations.timeToLiveInMinutes=1440",
"cache.specs.relations.maxSize=0",
"cache.specs.devices.timeToLiveInMinutes=60",
"cache.specs.devices.maxSize=100"})
@Slf4j
public class CaffeineCacheConfigurationTest {
public class CacheSpecsMapTest {
@Autowired
CacheManager cacheManager;

View File

@ -15,10 +15,15 @@
*/
package org.thingsboard.server.dao.attributes;
import lombok.Getter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import org.springframework.stereotype.Service;
import org.thingsboard.server.cache.CacheSpecsMap;
import org.thingsboard.server.cache.TBRedisCacheConfiguration;
import org.thingsboard.server.common.data.CacheConstants;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.dao.cache.RedisTbTransactionalCache;
@ -27,7 +32,20 @@ import org.thingsboard.server.dao.cache.RedisTbTransactionalCache;
@Service("AttributeCache")
public class AttributeRedisCache extends RedisTbTransactionalCache<AttributeCacheKey, AttributeKvEntry> {
public AttributeRedisCache(CacheManager cacheManager, RedisConnectionFactory connectionFactory) {
super(cacheManager, CacheConstants.ATTRIBUTES_CACHE, connectionFactory);
public AttributeRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) {
super(CacheConstants.ATTRIBUTES_CACHE, cacheSpecsMap, connectionFactory, configuration, new RedisSerializer<>() {
private final RedisSerializer<Object> java = RedisSerializer.java();
@Override
public byte[] serialize(AttributeKvEntry attributeKvEntry) throws SerializationException {
return java.serialize(attributeKvEntry);
}
@Override
public AttributeKvEntry deserialize(byte[] bytes) throws SerializationException {
return (AttributeKvEntry) java.deserialize(bytes);
}
});
}
}

View File

@ -31,11 +31,11 @@ import java.util.Optional;
*/
public interface AttributesDao {
ListenableFuture<Optional<AttributeKvEntry>> find(TenantId tenantId, EntityId entityId, String attributeType, String attributeKey);
Optional<AttributeKvEntry> find(TenantId tenantId, EntityId entityId, String attributeType, String attributeKey);
ListenableFuture<List<AttributeKvEntry>> find(TenantId tenantId, EntityId entityId, String attributeType, Collection<String> attributeKey);
List<AttributeKvEntry> find(TenantId tenantId, EntityId entityId, String attributeType, Collection<String> attributeKey);
ListenableFuture<List<AttributeKvEntry>> findAll(TenantId tenantId, EntityId entityId, String attributeType);
List<AttributeKvEntry> findAll(TenantId tenantId, EntityId entityId, String attributeType);
ListenableFuture<String> save(TenantId tenantId, EntityId entityId, String attributeType, AttributeKvEntry attribute);

View File

@ -53,20 +53,20 @@ public class BaseAttributesService implements AttributesService {
public ListenableFuture<Optional<AttributeKvEntry>> find(TenantId tenantId, EntityId entityId, String scope, String attributeKey) {
validate(entityId, scope);
Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey);
return attributesDao.find(tenantId, entityId, scope, attributeKey);
return Futures.immediateFuture(attributesDao.find(tenantId, entityId, scope, attributeKey));
}
@Override
public ListenableFuture<List<AttributeKvEntry>> find(TenantId tenantId, EntityId entityId, String scope, Collection<String> attributeKeys) {
validate(entityId, scope);
attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey));
return attributesDao.find(tenantId, entityId, scope, attributeKeys);
return Futures.immediateFuture(attributesDao.find(tenantId, entityId, scope, attributeKeys));
}
@Override
public ListenableFuture<List<AttributeKvEntry>> findAll(TenantId tenantId, EntityId entityId, String scope) {
validate(entityId, scope);
return attributesDao.findAll(tenantId, entityId, scope);
return Futures.immediateFuture(attributesDao.findAll(tenantId, entityId, scope));
}
@Override

View File

@ -17,17 +17,16 @@ package org.thingsboard.server.dao.attributes;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.Cache;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.thingsboard.server.cache.TbCacheValueWrapper;
import org.thingsboard.server.common.data.CacheConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -48,7 +47,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
@ -66,7 +64,7 @@ public class CachedAttributesService implements AttributesService {
private final DefaultCounter hitCounter;
private final DefaultCounter missCounter;
private final TbTransactionalCache<AttributeCacheKey, AttributeKvEntry> cache;
private Executor cacheExecutor;
private ListeningExecutorService cacheExecutor;
@Value("${cache.type}")
private String cacheType;
@ -93,13 +91,13 @@ public class CachedAttributesService implements AttributesService {
* - for the <b>local</b> cache type (cache.type="coffeine"): directExecutor (run callback immediately in the same thread)
* - for the <b>remote</b> cache: dedicated thread pool for the cache IO calls to unblock any caller thread
*/
Executor getExecutor(String cacheType, CacheExecutorService cacheExecutorService) {
ListeningExecutorService getExecutor(String cacheType, CacheExecutorService cacheExecutorService) {
if (StringUtils.isEmpty(cacheType) || LOCAL_CACHE_TYPE.equals(cacheType)) {
log.info("Going to use directExecutor for the local cache type {}", cacheType);
return MoreExecutors.directExecutor();
return MoreExecutors.newDirectExecutorService();
}
log.info("Going to use cacheExecutorService for the remote cache type {}", cacheType);
return cacheExecutorService;
return cacheExecutorService.executor();
}
@ -116,14 +114,18 @@ public class CachedAttributesService implements AttributesService {
return Futures.immediateFuture(Optional.ofNullable(cachedAttributeKvEntry));
} else {
missCounter.increment();
var cacheTransaction = cache.newTransactionForKey(attributeCacheKey);
ListenableFuture<Optional<AttributeKvEntry>> result = attributesDao.find(tenantId, entityId, scope, attributeKey);
cacheTransaction.rollBackOnFailure(result, cacheExecutor);
return Futures.transform(result, foundAttrKvEntry -> {
cacheTransaction.putIfAbsent(attributeCacheKey, foundAttrKvEntry.orElse(null));
cacheTransaction.commit();
return foundAttrKvEntry;
}, cacheExecutor);
return cacheExecutor.submit(() -> {
var cacheTransaction = cache.newTransactionForKey(attributeCacheKey);
try {
Optional<AttributeKvEntry> result = attributesDao.find(tenantId, entityId, scope, attributeKey);
cacheTransaction.putIfAbsent(attributeCacheKey, result.orElse(null));
cacheTransaction.commit();
return result;
} catch (Throwable e) {
cacheTransaction.rollback();
throw e;
}
});
}
}
@ -147,23 +149,27 @@ public class CachedAttributesService implements AttributesService {
List<AttributeCacheKey> notFoundKeys = notFoundAttributeKeys.stream().map(k -> new AttributeCacheKey(scope, entityId, k)).collect(Collectors.toList());
var cacheTransaction = cache.newTransactionForKeys(notFoundKeys);
ListenableFuture<List<AttributeKvEntry>> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys);
return Futures.transform(result, foundInDbAttributes -> {
for (AttributeKvEntry foundInDbAttribute : foundInDbAttributes) {
AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, foundInDbAttribute.getKey());
cacheTransaction.putIfAbsent(attributeCacheKey, foundInDbAttribute);
notFoundAttributeKeys.remove(foundInDbAttribute.getKey());
return cacheExecutor.submit(() -> {
var cacheTransaction = cache.newTransactionForKeys(notFoundKeys);
try {
List<AttributeKvEntry> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys);
for (AttributeKvEntry foundInDbAttribute : result) {
AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, foundInDbAttribute.getKey());
cacheTransaction.putIfAbsent(attributeCacheKey, foundInDbAttribute);
notFoundAttributeKeys.remove(foundInDbAttribute.getKey());
}
for (String key : notFoundAttributeKeys) {
cacheTransaction.putIfAbsent(new AttributeCacheKey(scope, entityId, key), null);
}
List<AttributeKvEntry> mergedAttributes = new ArrayList<>(cachedAttributes);
mergedAttributes.addAll(result);
cacheTransaction.commit();
return mergedAttributes;
} catch (Throwable e) {
cacheTransaction.rollback();
throw e;
}
for (String key : notFoundAttributeKeys) {
cacheTransaction.putIfAbsent(new AttributeCacheKey(scope, entityId, key), null);
}
List<AttributeKvEntry> mergedAttributes = new ArrayList<>(cachedAttributes);
mergedAttributes.addAll(foundInDbAttributes);
cacheTransaction.commit();
return mergedAttributes;
}, cacheExecutor);
});
}
private Map<String, TbCacheValueWrapper<AttributeKvEntry>> findCachedAttributes(EntityId entityId, String scope, Collection<String> attributeKeys) {
@ -183,7 +189,7 @@ public class CachedAttributesService implements AttributesService {
@Override
public ListenableFuture<List<AttributeKvEntry>> findAll(TenantId tenantId, EntityId entityId, String scope) {
validate(entityId, scope);
return attributesDao.findAll(tenantId, entityId, scope);
return Futures.immediateFuture(attributesDao.findAll(tenantId, entityId, scope));
}
@Override
@ -205,7 +211,7 @@ public class CachedAttributesService implements AttributesService {
for (var attribute : attributes) {
ListenableFuture<String> future = attributesDao.save(tenantId, entityId, scope, attribute);
futures.add(Futures.transform(future, key -> {
cache.evict(new AttributeCacheKey(scope, entityId, key));
cache.evictOrPut(new AttributeCacheKey(scope, entityId, key), attribute);
return key;
}, cacheExecutor));
}

View File

@ -15,14 +15,10 @@
*/
package org.thingsboard.server.dao.cache;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.thingsboard.server.cache.TbCacheTransaction;
import java.io.Serializable;
@ -30,7 +26,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
@Slf4j
@RequiredArgsConstructor
@ -61,19 +56,5 @@ public class CaffeineTbCacheTransaction<K extends Serializable, V extends Serial
cache.rollback(id);
}
@Override
public <T> void rollBackOnFailure(ListenableFuture<T> future, Executor executor) {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable T result) {
}
@Override
public void onFailure(Throwable t) {
log.trace("[{}] Rollback transaction due to error", id, t);
rollback();
}
}, executor);
}
}

View File

@ -71,6 +71,12 @@ public abstract class CaffeineTbTransactionalCache<K extends Serializable, V ext
}
}
@Override
public void evictOrPut(K key, V value) {
//No need to put the value in case of Caffeine, because evict will cancel concurrent transaction used to "get" the missing value from cache.
evict(key);
}
@Override
public TbCacheTransaction<K, V> newTransactionForKey(K key) {
return newTransaction(Collections.singletonList(key));

View File

@ -0,0 +1,70 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.cache;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.data.redis.connection.RedisConnection;
import org.thingsboard.server.cache.TbCacheTransaction;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executor;
@Slf4j
@RequiredArgsConstructor
public class RedisTbCacheTransaction<K extends Serializable, V extends Serializable> implements TbCacheTransaction<K, V> {
private final RedisTbTransactionalCache<K, V> cache;
private final RedisConnection connection;
@Override
public void putIfAbsent(K key, V value) {
cache.putIfAbsent(connection, key, value);
}
@Override
public boolean commit() {
try {
var execResult = connection.exec();
var result = execResult!= null && execResult.stream().anyMatch(Objects::nonNull);
log.warn("Transaction result: {}", result);
return result;
} finally {
connection.close();
}
}
@Override
public void rollback() {
try {
connection.discard();
} finally {
connection.close();
}
}
}

View File

@ -17,45 +17,155 @@ package org.thingsboard.server.dao.cache;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.cache.CacheManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.support.NullValue;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.thingsboard.server.cache.CacheSpecs;
import org.thingsboard.server.cache.CacheSpecsMap;
import org.thingsboard.server.cache.TBRedisCacheConfiguration;
import org.thingsboard.server.cache.TbCacheTransaction;
import org.thingsboard.server.cache.TbCacheValueWrapper;
import org.thingsboard.server.cache.TbTransactionalCache;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@RequiredArgsConstructor
@Slf4j
public abstract class RedisTbTransactionalCache<K extends Serializable, V extends Serializable> implements TbTransactionalCache<K, V> {
private final CacheManager cacheManager;
private static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE);
@Getter
private final String cacheName;
private final RedisConnectionFactory connectionFactory;
private final RedisSerializer<String> keySerializer = new StringRedisSerializer();
private final RedisSerializer<V> valueSerializer;
private final Expiration evictExpiration;
private final Expiration cacheTtl;
public RedisTbTransactionalCache(String cacheName,
CacheSpecsMap cacheSpecsMap,
RedisConnectionFactory connectionFactory,
TBRedisCacheConfiguration configuration,
RedisSerializer<V> valueSerializer) {
this.cacheName = cacheName;
this.connectionFactory = connectionFactory;
this.valueSerializer = valueSerializer;
this.evictExpiration = Expiration.from(configuration.getEvictTtlInMs(), TimeUnit.MILLISECONDS);
CacheSpecs cacheSpecs = cacheSpecsMap.getSpecs().get(cacheName);
if (cacheSpecs == null) {
throw new RuntimeException("Missing cache specs for " + cacheSpecs);
}
this.cacheTtl = Expiration.from(cacheSpecs.getTimeToLiveInMinutes(), TimeUnit.MINUTES);
}
@Override
public TbCacheValueWrapper<V> get(K key) {
return null;
try (var connection = connectionFactory.getConnection()) {
byte[] rawKey = getRawKey(key);
byte[] rawValue = connection.get(rawKey);
if (rawValue == null) {
return null;
} else if (Arrays.equals(rawValue, BINARY_NULL_VALUE)) {
return SimpleTbCacheValueWrapper.empty();
} else {
V value = valueSerializer.deserialize(rawValue);
return SimpleTbCacheValueWrapper.wrap(value);
}
}
}
@Override
public void putIfAbsent(K key, V value) {
try (var connection = connectionFactory.getConnection()) {
putIfAbsent(connection, key, value);
}
}
@Override
public void evict(K key) {
try (var connection = connectionFactory.getConnection()) {
connection.del(getRawKey(key));
}
}
@Override
public void evictOrPut(K key, V value) {
try (var connection = connectionFactory.getConnection()) {
var rawKey = getRawKey(key);
var records = connection.del(rawKey);
if (records == null || records == 0) {
//We need to put the value in case of Redis, because evict will NOT cancel concurrent transaction used to "get" the missing value from cache.
connection.set(rawKey, getRawValue(value), evictExpiration, RedisStringCommands.SetOption.UPSERT);
}
}
}
@Override
public TbCacheTransaction<K, V> newTransactionForKey(K key) {
return null;
byte[][] rawKey = new byte[][]{getRawKey(key)};
RedisConnection connection = watch(rawKey);
return new RedisTbCacheTransaction<>(this, connection);
}
@Override
public TbCacheTransaction<K, V> newTransactionForKeys(List<K> keys) {
return null;
byte[][] rawKeysList = keys.stream().map(this::getRawKey).toArray(byte[][]::new);
RedisConnection connection = watch(rawKeysList);
return new RedisTbCacheTransaction<>(this, connection);
}
private RedisConnection watch(byte[][] rawKeysList) {
var connection = connectionFactory.getConnection();
try {
connection.watch(rawKeysList);
connection.multi();
} catch (Exception e) {
connection.close();
}
return connection;
}
private byte[] getRawKey(K key) {
String keyString = cacheName + key.toString();
byte[] rawKey;
try {
rawKey = keySerializer.serialize(keyString);
} catch (Exception e) {
log.warn("Failed to serialize the cache key: {}", key, e);
throw new RuntimeException(e);
}
if (rawKey == null) {
log.warn("Failed to serialize the cache key: {}", key);
throw new IllegalArgumentException("Failed to serialize the cache key!");
}
return rawKey;
}
private byte[] getRawValue(V value) {
if (value == null) {
return BINARY_NULL_VALUE;
} else {
try {
return valueSerializer.serialize(value);
} catch (Exception e) {
log.warn("Failed to serialize the cache value: {}", value, e);
throw new RuntimeException(e);
}
}
}
public void putIfAbsent(RedisConnection connection, K key, V value) {
byte[] rawKey = getRawKey(key);
byte[] rawValue = getRawValue(value);
connection.set(rawKey, rawValue, cacheTtl, RedisStringCommands.SetOption.SET_IF_ABSENT);
}
}

View File

@ -30,6 +30,14 @@ public class SimpleTbCacheValueWrapper<T> implements TbCacheValueWrapper<T> {
return value;
}
public static <T> SimpleTbCacheValueWrapper<T> empty() {
return new SimpleTbCacheValueWrapper<>(null);
}
public static <T> SimpleTbCacheValueWrapper<T> wrap(T value) {
return new SimpleTbCacheValueWrapper<>(value);
}
@SuppressWarnings("unchecked")
public static <T> SimpleTbCacheValueWrapper<T> wrap(Cache.ValueWrapper source) {
return source == null ? null : new SimpleTbCacheValueWrapper<>((T) source.get());

View File

@ -110,33 +110,30 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
}
@Override
public ListenableFuture<Optional<AttributeKvEntry>> find(TenantId tenantId, EntityId entityId, String attributeType, String attributeKey) {
public Optional<AttributeKvEntry> find(TenantId tenantId, EntityId entityId, String attributeType, String attributeKey) {
AttributeKvCompositeKey compositeKey =
getAttributeKvCompositeKey(entityId, attributeType, attributeKey);
return Futures.immediateFuture(
Optional.ofNullable(DaoUtil.getData(attributeKvRepository.findById(compositeKey))));
return Optional.ofNullable(DaoUtil.getData(attributeKvRepository.findById(compositeKey)));
}
@Override
public ListenableFuture<List<AttributeKvEntry>> find(TenantId tenantId, EntityId entityId, String attributeType, Collection<String> attributeKeys) {
public List<AttributeKvEntry> find(TenantId tenantId, EntityId entityId, String attributeType, Collection<String> attributeKeys) {
List<AttributeKvCompositeKey> compositeKeys =
attributeKeys
.stream()
.map(attributeKey ->
getAttributeKvCompositeKey(entityId, attributeType, attributeKey))
.collect(Collectors.toList());
return Futures.immediateFuture(
DaoUtil.convertDataList(Lists.newArrayList(attributeKvRepository.findAllById(compositeKeys))));
return DaoUtil.convertDataList(Lists.newArrayList(attributeKvRepository.findAllById(compositeKeys)));
}
@Override
public ListenableFuture<List<AttributeKvEntry>> findAll(TenantId tenantId, EntityId entityId, String attributeType) {
return Futures.immediateFuture(
DaoUtil.convertDataList(Lists.newArrayList(
public List<AttributeKvEntry> findAll(TenantId tenantId, EntityId entityId, String attributeType) {
return DaoUtil.convertDataList(Lists.newArrayList(
attributeKvRepository.findAllByEntityTypeAndEntityIdAndAttributeType(
entityId.getEntityType(),
entityId.getId(),
attributeType))));
attributeType)));
}
@Override

View File

@ -19,9 +19,11 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.cache.TbTransactionalCache;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
@ -29,6 +31,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.dao.AbstractDaoServiceTest;
import org.thingsboard.server.dao.attributes.AttributeCacheKey;
import org.thingsboard.server.dao.attributes.CachedAttributesService;
import java.util.ArrayList;
@ -41,11 +44,15 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
public class AttributeServiceTest extends AbstractDaoServiceTest {
private static final String OLD_VALUE = "OLD VALUE";
private static final String NEW_VALUE = "NEW VALUE";
@Autowired
private TbTransactionalCache<AttributeCacheKey, AttributeKvEntry> cache;
@Autowired
private CachedAttributesService attributesService;
@ -57,6 +64,24 @@ public class AttributeServiceTest extends AbstractDaoServiceTest {
Assert.assertTrue(result.isEmpty());
}
@Test
public void testConcurrentTransaction() throws Exception {
var tenantId = new TenantId(UUID.randomUUID());
var deviceId = new DeviceId(UUID.randomUUID());
var scope = DataConstants.SERVER_SCOPE;
var key = "TEST";
var attrKey = new AttributeCacheKey(scope, deviceId, "TEST");
var oldValue = new BaseAttributeKvEntry(System.currentTimeMillis(), new StringDataEntry(key, OLD_VALUE));
var newValue = new BaseAttributeKvEntry(System.currentTimeMillis(), new StringDataEntry(key, NEW_VALUE));
var trx = cache.newTransactionForKey(attrKey);
cache.putIfAbsent(attrKey, newValue);
trx.putIfAbsent(attrKey, oldValue);
Assert.assertFalse(trx.commit());
Assert.assertEquals(NEW_VALUE, getAttributeValue(tenantId, deviceId, scope, key));
}
@Test
public void testConcurrentFetchAndUpdate() throws Exception {
var tenantId = new TenantId(UUID.randomUUID());
@ -173,6 +198,7 @@ public class AttributeServiceTest extends AbstractDaoServiceTest {
Optional<AttributeKvEntry> entry = attributesService.find(tenantId, deviceId, scope, key).get(10, TimeUnit.SECONDS);
return entry.orElseThrow(RuntimeException::new).getStrValue().orElse("Unknown");
} catch (Exception e) {
log.warn("Failed to get attribute", e.getCause());
throw new RuntimeException(e);
}
}
@ -182,6 +208,7 @@ public class AttributeServiceTest extends AbstractDaoServiceTest {
List<AttributeKvEntry> entry = attributesService.find(tenantId, deviceId, scope, keys).get(10, TimeUnit.SECONDS);
return entry.stream().map(e -> e.getStrValue().orElse(null)).collect(Collectors.toList());
} catch (Exception e) {
log.warn("Failed to get attributes", e.getCause());
throw new RuntimeException(e);
}
}
@ -191,6 +218,7 @@ public class AttributeServiceTest extends AbstractDaoServiceTest {
AttributeKvEntry newEntry = new BaseAttributeKvEntry(System.currentTimeMillis(), new StringDataEntry(key, s));
attributesService.save(tenantId, deviceId, scope, Collections.singletonList(newEntry)).get(10, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("Failed to save attribute", e.getCause());
Assert.assertNull(e);
}
}

View File

@ -17,12 +17,16 @@ package org.thingsboard.server.dao.sql.attributes;
import lombok.extern.slf4j.Slf4j;
import org.junit.ClassRule;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.support.TestPropertySourceUtils;
import org.testcontainers.containers.GenericContainer;
import org.thingsboard.server.cache.TbTransactionalCache;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.dao.attributes.AttributeCacheKey;
@TestPropertySource(properties = {
"cache.type=redis", "redis.connection.type=standalone"
@ -32,7 +36,7 @@ import org.testcontainers.containers.GenericContainer;
public class RedisAttributeServiceTest extends AttributeServiceTest implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@ClassRule
public static GenericContainer redis = new GenericContainer("redis:4.0").withExposedPorts(6379);
public static GenericContainer redis = new GenericContainer("redis:latest").withExposedPorts(6379);
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
@ -42,4 +46,5 @@ public class RedisAttributeServiceTest extends AttributeServiceTest implements A
applicationContext, "redis.standalone.port=" + redis.getMappedPort(6379));
}
}

View File

@ -14,50 +14,50 @@ cache.maximumPoolSize=16
cache.attributes.enabled=true
#cache.type=redis
caffeine.specs.relations.timeToLiveInMinutes=1440
caffeine.specs.relations.maxSize=100000
cache.specs.relations.timeToLiveInMinutes=1440
cache.specs.relations.maxSize=100000
caffeine.specs.deviceCredentials.timeToLiveInMinutes=1440
caffeine.specs.deviceCredentials.maxSize=100000
cache.specs.deviceCredentials.timeToLiveInMinutes=1440
cache.specs.deviceCredentials.maxSize=100000
caffeine.specs.devices.timeToLiveInMinutes=1440
caffeine.specs.devices.maxSize=100000
cache.specs.devices.timeToLiveInMinutes=1440
cache.specs.devices.maxSize=100000
caffeine.specs.sessions.timeToLiveInMinutes=1440
caffeine.specs.sessions.maxSize=100000
cache.specs.sessions.timeToLiveInMinutes=1440
cache.specs.sessions.maxSize=100000
caffeine.specs.assets.timeToLiveInMinutes=1440
caffeine.specs.assets.maxSize=100000
cache.specs.assets.timeToLiveInMinutes=1440
cache.specs.assets.maxSize=100000
caffeine.specs.entityViews.timeToLiveInMinutes=1440
caffeine.specs.entityViews.maxSize=100000
cache.specs.entityViews.timeToLiveInMinutes=1440
cache.specs.entityViews.maxSize=100000
caffeine.specs.claimDevices.timeToLiveInMinutes=1440
caffeine.specs.claimDevices.maxSize=100000
cache.specs.claimDevices.timeToLiveInMinutes=1440
cache.specs.claimDevices.maxSize=100000
caffeine.specs.securitySettings.timeToLiveInMinutes=1440
caffeine.specs.securitySettings.maxSize=100000
cache.specs.securitySettings.timeToLiveInMinutes=1440
cache.specs.securitySettings.maxSize=100000
caffeine.specs.tenantProfiles.timeToLiveInMinutes=1440
caffeine.specs.tenantProfiles.maxSize=100000
cache.specs.tenantProfiles.timeToLiveInMinutes=1440
cache.specs.tenantProfiles.maxSize=100000
caffeine.specs.deviceProfiles.timeToLiveInMinutes=1440
caffeine.specs.deviceProfiles.maxSize=100000
cache.specs.deviceProfiles.timeToLiveInMinutes=1440
cache.specs.deviceProfiles.maxSize=100000
caffeine.specs.attributes.timeToLiveInMinutes=1440
caffeine.specs.attributes.maxSize=100000
cache.specs.attributes.timeToLiveInMinutes=1440
cache.specs.attributes.maxSize=100000
caffeine.specs.tokensOutdatageTime.timeToLiveInMinutes=1440
caffeine.specs.tokensOutdatageTime.maxSize=100000
cache.specs.tokensOutdatageTime.timeToLiveInMinutes=1440
cache.specs.tokensOutdatageTime.maxSize=100000
caffeine.specs.otaPackages.timeToLiveInMinutes=1440
caffeine.specs.otaPackages.maxSize=100000
cache.specs.otaPackages.timeToLiveInMinutes=1440
cache.specs.otaPackages.maxSize=100000
caffeine.specs.otaPackagesData.timeToLiveInMinutes=1440
caffeine.specs.otaPackagesData.maxSize=100000
cache.specs.otaPackagesData.timeToLiveInMinutes=1440
cache.specs.otaPackagesData.maxSize=100000
caffeine.specs.edges.timeToLiveInMinutes=1440
caffeine.specs.edges.maxSize=100000
cache.specs.edges.timeToLiveInMinutes=1440
cache.specs.edges.maxSize=100000
redis.connection.host=localhost