fixed insert result and used expiration

This commit is contained in:
YevhenBondarenko 2024-07-01 12:24:27 +02:00
parent 118407d982
commit 6162060e88
13 changed files with 39 additions and 59 deletions

View File

@ -54,8 +54,8 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
private final JedisConnectionFactory connectionFactory;
private final RedisSerializer<String> keySerializer = StringRedisSerializer.UTF_8;
private final TbRedisSerializer<K, V> valueSerializer;
private final Expiration evictExpiration;
private final Expiration cacheTtl;
protected final Expiration evictExpiration;
protected final Expiration cacheTtl;
public RedisTbTransactionalCache(String cacheName,
CacheSpecsMap cacheSpecsMap,

View File

@ -26,7 +26,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@RequiredArgsConstructor
public abstract class VersionedCaffeineTbTransactionalCache<K extends Serializable, V extends Serializable & HasVersion> implements VersionedTbTransactionalCache<K, V> {
public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends Serializable & HasVersion> implements VersionedTbCache<K, V> {
private final CacheManager cacheManager;
private final String cacheName;
@ -53,7 +53,7 @@ public abstract class VersionedCaffeineTbTransactionalCache<K extends Serializab
try {
TbPair<Long, V> versionValuePair = doGet(key);
Long currentVersion = versionValuePair.getFirst();
if (currentVersion == null || version >= currentVersion) {
if (currentVersion == null || version > currentVersion) {
cacheManager.getCache(cacheName).put(key, TbPair.of(version, value));
}
} finally {

View File

@ -22,6 +22,7 @@ import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.thingsboard.server.common.data.HasVersion;
@ -31,34 +32,29 @@ import java.util.Collection;
import java.util.List;
@Slf4j
public abstract class VersionedRedisTbTransactionalCache<K extends Serializable, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbTransactionalCache<K, V> {
public abstract class VersionedRedisTbCache<K extends Serializable, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbCache<K, V> {
private static final int VERSION_SIZE = 8;
private static final int VALUE_END_OFFSET = -1;
static final byte[] SET_VERSIONED_VALUE_LUA_SCRIPT = StringRedisSerializer.UTF_8.serialize("""
-- KEYS[1] is the key
-- ARGV[1] is the new value
-- ARGV[2] is the new version
local key = KEYS[1]
local newValue = ARGV[1]
local newVersion = tonumber(ARGV[2])
local expiration = tonumber(ARGV[3])
-- Function to set the new value with the version
local function setNewValue()
local newValueWithVersion = struct.pack(">I8", newVersion) .. newValue:sub(9)
redis.call('SET', key, newValueWithVersion)
redis.call('SET', key, newValueWithVersion, 'EX', expiration)
end
-- Get the current version (first 8 bytes) of the current value
local currentVersionBytes = redis.call('GETRANGE', key, 0, 7)
if currentVersionBytes and #currentVersionBytes == 8 then
-- Extract the current version from the first 8 bytes
local currentVersion = tonumber(struct.unpack(">I8", currentVersionBytes))
if newVersion >= currentVersion then
if newVersion > currentVersion then
setNewValue()
end
else
@ -66,9 +62,9 @@ public abstract class VersionedRedisTbTransactionalCache<K extends Serializable,
setNewValue()
end
""");
static final byte[] SET_VERSIONED_VALUE_SHA = StringRedisSerializer.UTF_8.serialize("041b109dd56f6c8afb55090076e754727a5d3da0");
static final byte[] SET_VERSIONED_VALUE_SHA = StringRedisSerializer.UTF_8.serialize("1d0cb3f1d1f899b8e456789fc5000196d5bb3025");
public VersionedRedisTbTransactionalCache(String cacheName, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory, TBRedisCacheConfiguration configuration, TbRedisSerializer<K, V> valueSerializer) {
public VersionedRedisTbCache(String cacheName, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory, TBRedisCacheConfiguration configuration, TbRedisSerializer<K, V> valueSerializer) {
super(cacheName, cacheSpecsMap, connectionFactory, configuration, valueSerializer);
}
@ -92,23 +88,27 @@ public abstract class VersionedRedisTbTransactionalCache<K extends Serializable,
@Override
public void put(K key, V value) {
Long version = value!= null ? value.getVersion() : 0;
Long version = value != null ? value.getVersion() : 0;
put(key, value, version);
}
@Override
public void put(K key, V value, Long version) {
//TODO: use expiration
log.trace("put [{}][{}][{}]", key, value, version);
doPut(key, value, version, cacheTtl);
}
private void doPut(K key, V value, Long version, Expiration expiration) {
if (version == null) {
return;
}
final byte[] rawKey = getRawKey(key);
byte[] rawValue = getRawValue(value);
byte[] rawVersion = StringRedisSerializer.UTF_8.serialize(String.valueOf(version));
byte[] rawExpiration = StringRedisSerializer.UTF_8.serialize(String.valueOf(expiration.getExpirationTimeInSeconds()));
try (var connection = getConnection(rawKey)) {
byte[] rawValue = getRawValue(value);
byte[] rawVersion = StringRedisSerializer.UTF_8.serialize(String.valueOf(version));
try {
connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion);
connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration);
} catch (InvalidDataAccessApiUsageException e) {
log.debug("loading LUA [{}]", connection.getNativeConnection());
String sha = connection.scriptingCommands().scriptLoad(SET_VERSIONED_VALUE_LUA_SCRIPT);
@ -116,10 +116,10 @@ public abstract class VersionedRedisTbTransactionalCache<K extends Serializable,
log.error("SHA for SET_VERSIONED_VALUE_LUA_SCRIPT wrong! Expected [{}], but actual [{}]", new String(SET_VERSIONED_VALUE_SHA), sha);
}
try {
connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion);
connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration);
} catch (InvalidDataAccessApiUsageException ignored) {
log.debug("Slowly executing eval instead of fast evalsha");
connection.scriptingCommands().eval(SET_VERSIONED_VALUE_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion);
connection.scriptingCommands().eval(SET_VERSIONED_VALUE_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion, rawExpiration);
}
}
}
@ -129,8 +129,7 @@ public abstract class VersionedRedisTbTransactionalCache<K extends Serializable,
public void evict(K key, Long version) {
log.trace("evict [{}][{}]", key, version);
if (version != null) {
//TODO: use evict expiration
put(key, null, version);
doPut(key, null, version, evictExpiration);
}
}

View File

@ -19,7 +19,7 @@ import org.thingsboard.server.common.data.HasVersion;
import java.io.Serializable;
public interface VersionedTbTransactionalCache<K extends Serializable, V extends Serializable & HasVersion> {
public interface VersionedTbCache<K extends Serializable, V extends Serializable & HasVersion> {
TbCacheValueWrapper<V> get(K key);

View File

@ -68,7 +68,7 @@ public abstract class AbstractVersionedInsertRepository<T> extends AbstractInser
seqNumbersList = keyHolder.getKeyList();
for (int i = 0; i < insertResult.length; i++) {
if (updateResult[i] != 0) {
if (insertResult[i] != 0) {
seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(VERSION_COLUMN));
}
}

View File

@ -18,13 +18,13 @@ package org.thingsboard.server.dao.attributes;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;
import org.thingsboard.server.cache.VersionedCaffeineTbTransactionalCache;
import org.thingsboard.server.cache.VersionedCaffeineTbCache;
import org.thingsboard.server.common.data.CacheConstants;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true)
@Service("AttributeCache")
public class AttributeCaffeineCache extends VersionedCaffeineTbTransactionalCache<AttributeCacheKey, AttributeKvEntry> {
public class AttributeCaffeineCache extends VersionedCaffeineTbCache<AttributeCacheKey, AttributeKvEntry> {
public AttributeCaffeineCache(CacheManager cacheManager) {
super(cacheManager, CacheConstants.ATTRIBUTES_CACHE);

View File

@ -23,7 +23,7 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.cache.CacheSpecsMap;
import org.thingsboard.server.cache.TBRedisCacheConfiguration;
import org.thingsboard.server.cache.TbRedisSerializer;
import org.thingsboard.server.cache.VersionedRedisTbTransactionalCache;
import org.thingsboard.server.cache.VersionedRedisTbCache;
import org.thingsboard.server.common.data.CacheConstants;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
@ -38,7 +38,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
@Service("AttributeCache")
public class AttributeRedisCache extends VersionedRedisTbTransactionalCache<AttributeCacheKey, AttributeKvEntry> {
public class AttributeRedisCache extends VersionedRedisTbCache<AttributeCacheKey, AttributeKvEntry> {
public AttributeRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) {
super(CacheConstants.ATTRIBUTES_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() {

View File

@ -27,7 +27,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.thingsboard.server.cache.TbCacheValueWrapper;
import org.thingsboard.server.cache.VersionedTbTransactionalCache;
import org.thingsboard.server.cache.VersionedTbCache;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.DeviceProfileId;
@ -68,7 +68,7 @@ public class CachedAttributesService implements AttributesService {
private final CacheExecutorService cacheExecutorService;
private final DefaultCounter hitCounter;
private final DefaultCounter missCounter;
private final VersionedTbTransactionalCache<AttributeCacheKey, AttributeKvEntry> cache;
private final VersionedTbCache<AttributeCacheKey, AttributeKvEntry> cache;
private ListeningExecutorService cacheExecutor;
@Value("${cache.type:caffeine}")
@ -80,7 +80,7 @@ public class CachedAttributesService implements AttributesService {
JpaExecutorService jpaExecutorService,
StatsFactory statsFactory,
CacheExecutorService cacheExecutorService,
VersionedTbTransactionalCache<AttributeCacheKey, AttributeKvEntry> cache) {
VersionedTbCache<AttributeCacheKey, AttributeKvEntry> cache) {
this.attributesDao = attributesDao;
this.jpaExecutorService = jpaExecutorService;
this.cacheExecutorService = cacheExecutorService;

View File

@ -37,7 +37,7 @@ public interface AttributeKvRepository extends JpaRepository<AttributeKvEntity,
@Modifying
@Query(value = "DELETE FROM attribute_kv WHERE entity_id = :entityId " +
"AND attribute_type = :attributeType " +
"AND attribute_key = :attributeKey RETURNING version", nativeQuery = true)
"AND attribute_key = :attributeKey RETURNING nextval('attribute_kv_version_seq')", nativeQuery = true)
Long delete(@Param("entityId") UUID entityId,
@Param("attributeType") int attributeType,
@Param("attributeKey") int attributeKey);
@ -60,4 +60,3 @@ public interface AttributeKvRepository extends JpaRepository<AttributeKvEntity,
List<Integer> findAllKeysByEntityIdsAndAttributeType(@Param("entityIds") List<UUID> entityIds,
@Param("attributeType") int attributeType);
}

View File

@ -114,7 +114,7 @@ CREATE TABLE IF NOT EXISTS attribute_kv (
dbl_v double precision,
json_v json,
last_update_ts bigint,
version bigint default 1,
version bigint default 0,
CONSTRAINT attribute_kv_pkey PRIMARY KEY (entity_id, attribute_type, attribute_key)
);
@ -553,7 +553,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest
long_v bigint,
dbl_v double precision,
json_v json,
version bigint default 1,
version bigint default 0,
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
);

View File

@ -45,7 +45,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest (
long_v bigint,
dbl_v double precision,
json_v json,
version bigint default 1,
version bigint default 0,
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
);

View File

@ -26,6 +26,6 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest
long_v bigint,
dbl_v double precision,
json_v json,
version bigint default 1,
version bigint default 0,
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
);

View File

@ -26,7 +26,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.cache.TbTransactionalCache;
import org.thingsboard.server.cache.VersionedTbCache;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
@ -60,7 +60,7 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
private static final String NEW_VALUE = "NEW VALUE";
@Autowired
private TbTransactionalCache<AttributeCacheKey, AttributeKvEntry> cache;
private VersionedTbCache<AttributeCacheKey, AttributeKvEntry> cache;
@Autowired
private AttributesService attributesService;
@ -132,24 +132,6 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
Assert.assertTrue(result.isEmpty());
}
@Test
public void testConcurrentTransaction() throws Exception {
var tenantId = new TenantId(UUID.randomUUID());
var deviceId = new DeviceId(UUID.randomUUID());
var scope = AttributeScope.SERVER_SCOPE;
var key = "TEST";
var attrKey = new AttributeCacheKey(scope, deviceId, "TEST");
var oldValue = new BaseAttributeKvEntry(System.currentTimeMillis(), new StringDataEntry(key, OLD_VALUE));
var newValue = new BaseAttributeKvEntry(System.currentTimeMillis(), new StringDataEntry(key, NEW_VALUE));
var trx = cache.newTransactionForKey(attrKey);
cache.putIfAbsent(attrKey, newValue);
trx.putIfAbsent(attrKey, oldValue);
Assert.assertFalse(trx.commit());
Assert.assertEquals(NEW_VALUE, getAttributeValue(tenantId, deviceId, scope, key));
}
@Test
public void testConcurrentFetchAndUpdate() throws Exception {
var tenantId = new TenantId(UUID.randomUUID());