diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java index 543739aad4..e33e13caa1 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java @@ -54,8 +54,8 @@ public abstract class RedisTbTransactionalCache keySerializer = StringRedisSerializer.UTF_8; private final TbRedisSerializer valueSerializer; - private final Expiration evictExpiration; - private final Expiration cacheTtl; + protected final Expiration evictExpiration; + protected final Expiration cacheTtl; public RedisTbTransactionalCache(String cacheName, CacheSpecsMap cacheSpecsMap, diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbCache.java similarity index 91% rename from common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbTransactionalCache.java rename to common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbCache.java index f2120cf38a..e084668b90 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbCache.java @@ -26,7 +26,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @RequiredArgsConstructor -public abstract class VersionedCaffeineTbTransactionalCache implements VersionedTbTransactionalCache { +public abstract class VersionedCaffeineTbCache implements VersionedTbCache { private final CacheManager cacheManager; private final String cacheName; @@ -53,7 +53,7 @@ public abstract class VersionedCaffeineTbTransactionalCache versionValuePair = doGet(key); Long currentVersion = versionValuePair.getFirst(); - if (currentVersion == null || version >= currentVersion) { + if (currentVersion == null || version > currentVersion) { cacheManager.getCache(cacheName).put(key, TbPair.of(version, value)); } } finally { diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java similarity index 82% rename from common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbTransactionalCache.java rename to common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java index 34b46d535a..5e9ba187c9 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java @@ -22,6 +22,7 @@ import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.ReturnType; +import org.springframework.data.redis.core.types.Expiration; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.thingsboard.server.common.data.HasVersion; @@ -31,34 +32,29 @@ import java.util.Collection; import java.util.List; @Slf4j -public abstract class VersionedRedisTbTransactionalCache extends RedisTbTransactionalCache implements VersionedTbTransactionalCache { +public abstract class VersionedRedisTbCache extends RedisTbTransactionalCache implements VersionedTbCache { private static final int VERSION_SIZE = 8; private static final int VALUE_END_OFFSET = -1; static final byte[] SET_VERSIONED_VALUE_LUA_SCRIPT = StringRedisSerializer.UTF_8.serialize(""" - -- KEYS[1] is the key - -- ARGV[1] is the new value - -- ARGV[2] is the new version - local key = KEYS[1] local newValue = ARGV[1] local newVersion = tonumber(ARGV[2]) + local expiration = tonumber(ARGV[3]) - -- Function to set the new value with the version local function setNewValue() local newValueWithVersion = struct.pack(">I8", newVersion) .. newValue:sub(9) - redis.call('SET', key, newValueWithVersion) + redis.call('SET', key, newValueWithVersion, 'EX', expiration) end -- Get the current version (first 8 bytes) of the current value local currentVersionBytes = redis.call('GETRANGE', key, 0, 7) if currentVersionBytes and #currentVersionBytes == 8 then - -- Extract the current version from the first 8 bytes local currentVersion = tonumber(struct.unpack(">I8", currentVersionBytes)) - if newVersion >= currentVersion then + if newVersion > currentVersion then setNewValue() end else @@ -66,9 +62,9 @@ public abstract class VersionedRedisTbTransactionalCache valueSerializer) { + public VersionedRedisTbCache(String cacheName, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory, TBRedisCacheConfiguration configuration, TbRedisSerializer valueSerializer) { super(cacheName, cacheSpecsMap, connectionFactory, configuration, valueSerializer); } @@ -92,23 +88,27 @@ public abstract class VersionedRedisTbTransactionalCache { +public interface VersionedTbCache { TbCacheValueWrapper get(K key); diff --git a/dao/src/main/java/org/thingsboard/server/dao/AbstractVersionedInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/AbstractVersionedInsertRepository.java index 6380957f77..43ed41e52a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/AbstractVersionedInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/AbstractVersionedInsertRepository.java @@ -68,7 +68,7 @@ public abstract class AbstractVersionedInsertRepository extends AbstractInser seqNumbersList = keyHolder.getKeyList(); for (int i = 0; i < insertResult.length; i++) { - if (updateResult[i] != 0) { + if (insertResult[i] != 0) { seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(VERSION_COLUMN)); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java index e00ea8d11c..140cfbc03e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java @@ -18,13 +18,13 @@ package org.thingsboard.server.dao.attributes; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cache.CacheManager; import org.springframework.stereotype.Service; -import org.thingsboard.server.cache.VersionedCaffeineTbTransactionalCache; +import org.thingsboard.server.cache.VersionedCaffeineTbCache; import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.kv.AttributeKvEntry; @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true) @Service("AttributeCache") -public class AttributeCaffeineCache extends VersionedCaffeineTbTransactionalCache { +public class AttributeCaffeineCache extends VersionedCaffeineTbCache { public AttributeCaffeineCache(CacheManager cacheManager) { super(cacheManager, CacheConstants.ATTRIBUTES_CACHE); diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java index 68caf62ceb..a6f21f9c3a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java @@ -23,7 +23,7 @@ import org.springframework.stereotype.Service; import org.thingsboard.server.cache.CacheSpecsMap; import org.thingsboard.server.cache.TBRedisCacheConfiguration; import org.thingsboard.server.cache.TbRedisSerializer; -import org.thingsboard.server.cache.VersionedRedisTbTransactionalCache; +import org.thingsboard.server.cache.VersionedRedisTbCache; import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; @@ -38,7 +38,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType; @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") @Service("AttributeCache") -public class AttributeRedisCache extends VersionedRedisTbTransactionalCache { +public class AttributeRedisCache extends VersionedRedisTbCache { public AttributeRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) { super(CacheConstants.ATTRIBUTES_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() { diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index fb5c745a11..d31ff9db79 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -27,7 +27,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import org.thingsboard.server.cache.TbCacheValueWrapper; -import org.thingsboard.server.cache.VersionedTbTransactionalCache; +import org.thingsboard.server.cache.VersionedTbCache; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.DeviceProfileId; @@ -68,7 +68,7 @@ public class CachedAttributesService implements AttributesService { private final CacheExecutorService cacheExecutorService; private final DefaultCounter hitCounter; private final DefaultCounter missCounter; - private final VersionedTbTransactionalCache cache; + private final VersionedTbCache cache; private ListeningExecutorService cacheExecutor; @Value("${cache.type:caffeine}") @@ -80,7 +80,7 @@ public class CachedAttributesService implements AttributesService { JpaExecutorService jpaExecutorService, StatsFactory statsFactory, CacheExecutorService cacheExecutorService, - VersionedTbTransactionalCache cache) { + VersionedTbCache cache) { this.attributesDao = attributesDao; this.jpaExecutorService = jpaExecutorService; this.cacheExecutorService = cacheExecutorService; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java index beea5f2f2f..8367431946 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java @@ -37,7 +37,7 @@ public interface AttributeKvRepository extends JpaRepository findAllKeysByEntityIdsAndAttributeType(@Param("entityIds") List entityIds, @Param("attributeType") int attributeType); } - diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index c2e3a2b108..9e6ab22248 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -114,7 +114,7 @@ CREATE TABLE IF NOT EXISTS attribute_kv ( dbl_v double precision, json_v json, last_update_ts bigint, - version bigint default 1, + version bigint default 0, CONSTRAINT attribute_kv_pkey PRIMARY KEY (entity_id, attribute_type, attribute_key) ); @@ -553,7 +553,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest long_v bigint, dbl_v double precision, json_v json, - version bigint default 1, + version bigint default 0, CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key) ); diff --git a/dao/src/main/resources/sql/schema-timescale.sql b/dao/src/main/resources/sql/schema-timescale.sql index 7c2a99fba3..1d5b67a95e 100644 --- a/dao/src/main/resources/sql/schema-timescale.sql +++ b/dao/src/main/resources/sql/schema-timescale.sql @@ -45,7 +45,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest ( long_v bigint, dbl_v double precision, json_v json, - version bigint default 1, + version bigint default 0, CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key) ); diff --git a/dao/src/main/resources/sql/schema-ts-latest-psql.sql b/dao/src/main/resources/sql/schema-ts-latest-psql.sql index d50571c091..4892e40176 100644 --- a/dao/src/main/resources/sql/schema-ts-latest-psql.sql +++ b/dao/src/main/resources/sql/schema-ts-latest-psql.sql @@ -26,6 +26,6 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest long_v bigint, dbl_v double precision, json_v json, - version bigint default 1, + version bigint default 0, CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key) ); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java index 98a8cf9d47..ad2470a9b1 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java @@ -26,7 +26,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; -import org.thingsboard.server.cache.TbTransactionalCache; +import org.thingsboard.server.cache.VersionedTbCache; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; @@ -60,7 +60,7 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest { private static final String NEW_VALUE = "NEW VALUE"; @Autowired - private TbTransactionalCache cache; + private VersionedTbCache cache; @Autowired private AttributesService attributesService; @@ -132,24 +132,6 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest { Assert.assertTrue(result.isEmpty()); } - @Test - public void testConcurrentTransaction() throws Exception { - var tenantId = new TenantId(UUID.randomUUID()); - var deviceId = new DeviceId(UUID.randomUUID()); - var scope = AttributeScope.SERVER_SCOPE; - var key = "TEST"; - - var attrKey = new AttributeCacheKey(scope, deviceId, "TEST"); - var oldValue = new BaseAttributeKvEntry(System.currentTimeMillis(), new StringDataEntry(key, OLD_VALUE)); - var newValue = new BaseAttributeKvEntry(System.currentTimeMillis(), new StringDataEntry(key, NEW_VALUE)); - - var trx = cache.newTransactionForKey(attrKey); - cache.putIfAbsent(attrKey, newValue); - trx.putIfAbsent(attrKey, oldValue); - Assert.assertFalse(trx.commit()); - Assert.assertEquals(NEW_VALUE, getAttributeValue(tenantId, deviceId, scope, key)); - } - @Test public void testConcurrentFetchAndUpdate() throws Exception { var tenantId = new TenantId(UUID.randomUUID());