implemented versioned cache

This commit is contained in:
YevhenBondarenko 2024-06-26 15:51:32 +02:00
parent e57b9471dc
commit 118407d982
25 changed files with 492 additions and 104 deletions

View File

@ -29,7 +29,6 @@ import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.thingsboard.server.common.data.FstStatsService;
import redis.clients.jedis.Connection;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.util.JedisClusterCRC16;
@ -44,7 +43,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class RedisTbTransactionalCache<K extends Serializable, V extends Serializable> implements TbTransactionalCache<K, V> {
private static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE);
static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE);
static final JedisPool MOCK_POOL = new JedisPool(); //non-null pool required for JedisConnection to trigger closing jedis connection
@Autowired
@ -79,7 +78,7 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
public TbCacheValueWrapper<V> get(K key) {
try (var connection = connectionFactory.getConnection()) {
byte[] rawKey = getRawKey(key);
byte[] rawValue = connection.get(rawKey);
byte[] rawValue = doGet(connection, rawKey);
if (rawValue == null) {
return null;
} else if (Arrays.equals(rawValue, BINARY_NULL_VALUE)) {
@ -96,6 +95,10 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
}
}
protected byte[] doGet(RedisConnection connection, byte[] rawKey) {
return connection.stringCommands().get(rawKey);
}
@Override
public void put(K key, V value) {
try (var connection = connectionFactory.getConnection()) {
@ -153,7 +156,7 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
return new RedisTbCacheTransaction<>(this, connection);
}
private RedisConnection getConnection(byte[] rawKey) {
protected RedisConnection getConnection(byte[] rawKey) {
if (!connectionFactory.isRedisClusterAware()) {
return connectionFactory.getConnection();
}
@ -180,7 +183,7 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
return connection;
}
private byte[] getRawKey(K key) {
protected byte[] getRawKey(K key) {
String keyString = cacheName + key.toString();
byte[] rawKey;
try {
@ -196,7 +199,7 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
return rawKey;
}
private byte[] getRawValue(V value) {
protected byte[] getRawValue(V value) {
if (value == null) {
return BINARY_NULL_VALUE;
} else {

View File

@ -0,0 +1,91 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.cache;
import lombok.RequiredArgsConstructor;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.thingsboard.server.common.data.HasVersion;
import org.thingsboard.server.common.data.util.TbPair;
import java.io.Serializable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@RequiredArgsConstructor
public abstract class VersionedCaffeineTbTransactionalCache<K extends Serializable, V extends Serializable & HasVersion> implements VersionedTbTransactionalCache<K, V> {
private final CacheManager cacheManager;
private final String cacheName;
private final Lock lock = new ReentrantLock();
@Override
public TbCacheValueWrapper<V> get(K key) {
return SimpleTbCacheValueWrapper.wrap(doGet(key).getSecond());
}
@Override
public void put(K key, V value) {
Long version = value != null ? value.getVersion() : 0;
put(key, value, version);
}
@Override
public void put(K key, V value, Long version) {
if (version == null) {
return;
}
lock.lock();
try {
TbPair<Long, V> versionValuePair = doGet(key);
Long currentVersion = versionValuePair.getFirst();
if (currentVersion == null || version >= currentVersion) {
cacheManager.getCache(cacheName).put(key, TbPair.of(version, value));
}
} finally {
lock.unlock();
}
}
private TbPair<Long, V> doGet(K key) {
Cache.ValueWrapper source = cacheManager.getCache(cacheName).get(key);
return source == null ? TbPair.emptyPair() : (TbPair<Long, V>) source.get();
}
@Override
public void evict(K key) {
lock.lock();
try {
cacheManager.getCache(cacheName).evict(key);
} finally {
lock.unlock();
}
}
@Override
public void evict(K key, Long version) {
if (version == null) {
return;
}
lock.lock();
try {
put(key, null, version);
} finally {
lock.unlock();
}
}
}

View File

@ -0,0 +1,163 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.cache;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.thingsboard.server.common.data.HasVersion;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@Slf4j
public abstract class VersionedRedisTbTransactionalCache<K extends Serializable, V extends Serializable & HasVersion> extends RedisTbTransactionalCache<K, V> implements VersionedTbTransactionalCache<K, V> {
private static final int VERSION_SIZE = 8;
private static final int VALUE_END_OFFSET = -1;
static final byte[] SET_VERSIONED_VALUE_LUA_SCRIPT = StringRedisSerializer.UTF_8.serialize("""
-- KEYS[1] is the key
-- ARGV[1] is the new value
-- ARGV[2] is the new version
local key = KEYS[1]
local newValue = ARGV[1]
local newVersion = tonumber(ARGV[2])
-- Function to set the new value with the version
local function setNewValue()
local newValueWithVersion = struct.pack(">I8", newVersion) .. newValue:sub(9)
redis.call('SET', key, newValueWithVersion)
end
-- Get the current version (first 8 bytes) of the current value
local currentVersionBytes = redis.call('GETRANGE', key, 0, 7)
if currentVersionBytes and #currentVersionBytes == 8 then
-- Extract the current version from the first 8 bytes
local currentVersion = tonumber(struct.unpack(">I8", currentVersionBytes))
if newVersion >= currentVersion then
setNewValue()
end
else
-- If the current value is absent or the current version is not found, set the new value
setNewValue()
end
""");
static final byte[] SET_VERSIONED_VALUE_SHA = StringRedisSerializer.UTF_8.serialize("041b109dd56f6c8afb55090076e754727a5d3da0");
public VersionedRedisTbTransactionalCache(String cacheName, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory, TBRedisCacheConfiguration configuration, TbRedisSerializer<K, V> valueSerializer) {
super(cacheName, cacheSpecsMap, connectionFactory, configuration, valueSerializer);
}
@PostConstruct
public void init() {
try (var connection = getConnection(SET_VERSIONED_VALUE_SHA)) {
log.debug("Loading LUA with expected SHA[{}], connection [{}]", new String(SET_VERSIONED_VALUE_SHA), connection.getNativeConnection());
String sha = connection.scriptingCommands().scriptLoad(SET_VERSIONED_VALUE_LUA_SCRIPT);
if (!Arrays.equals(SET_VERSIONED_VALUE_SHA, StringRedisSerializer.UTF_8.serialize(sha))) {
log.error("SHA for SET_VERSIONED_VALUE_LUA_SCRIPT wrong! Expected [{}], but actual [{}], connection [{}]", new String(SET_VERSIONED_VALUE_SHA), sha, connection.getNativeConnection());
}
} catch (Throwable t) {
log.error("Error on Redis versioned cache init", t);
}
}
@Override
protected byte[] doGet(RedisConnection connection, byte[] rawKey) {
return connection.stringCommands().getRange(rawKey, VERSION_SIZE, VALUE_END_OFFSET);
}
@Override
public void put(K key, V value) {
Long version = value!= null ? value.getVersion() : 0;
put(key, value, version);
}
@Override
public void put(K key, V value, Long version) {
//TODO: use expiration
log.trace("put [{}][{}][{}]", key, value, version);
if (version == null) {
return;
}
final byte[] rawKey = getRawKey(key);
try (var connection = getConnection(rawKey)) {
byte[] rawValue = getRawValue(value);
byte[] rawVersion = StringRedisSerializer.UTF_8.serialize(String.valueOf(version));
try {
connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion);
} catch (InvalidDataAccessApiUsageException e) {
log.debug("loading LUA [{}]", connection.getNativeConnection());
String sha = connection.scriptingCommands().scriptLoad(SET_VERSIONED_VALUE_LUA_SCRIPT);
if (!Arrays.equals(SET_VERSIONED_VALUE_SHA, StringRedisSerializer.UTF_8.serialize(sha))) {
log.error("SHA for SET_VERSIONED_VALUE_LUA_SCRIPT wrong! Expected [{}], but actual [{}]", new String(SET_VERSIONED_VALUE_SHA), sha);
}
try {
connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion);
} catch (InvalidDataAccessApiUsageException ignored) {
log.debug("Slowly executing eval instead of fast evalsha");
connection.scriptingCommands().eval(SET_VERSIONED_VALUE_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion);
}
}
}
}
@Override
public void evict(K key, Long version) {
log.trace("evict [{}][{}]", key, version);
if (version != null) {
//TODO: use evict expiration
put(key, null, version);
}
}
@Override
public void putIfAbsent(K key, V value) {
log.trace("putIfAbsent [{}][{}]", key, value);
throw new NotImplementedException("putIfAbsent is not supported by versioned cache");
}
@Override
public void evict(Collection<K> keys) {
throw new NotImplementedException("evict by many keys is not supported by versioned cache");
}
@Override
public void evictOrPut(K key, V value) {
throw new NotImplementedException("evictOrPut is not supported by versioned cache");
}
@Override
public TbCacheTransaction<K, V> newTransactionForKey(K key) {
throw new NotImplementedException("newTransactionForKey is not supported by versioned cache");
}
@Override
public TbCacheTransaction<K, V> newTransactionForKeys(List<K> keys) {
throw new NotImplementedException("newTransactionForKeys is not supported by versioned cache");
}
}

View File

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.cache;
import org.thingsboard.server.common.data.HasVersion;
import java.io.Serializable;
public interface VersionedTbTransactionalCache<K extends Serializable, V extends Serializable & HasVersion> {
TbCacheValueWrapper<V> get(K key);
void put(K key, V value);
void put(K key, V value, Long version);
void evict(K key);
void evict(K key, Long version);
}

View File

@ -0,0 +1,20 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data;
public interface HasVersion {
long getVersion();
}

View File

@ -15,10 +15,12 @@
*/
package org.thingsboard.server.common.data.kv;
import org.thingsboard.server.common.data.HasVersion;
/**
* @author Andrew Shvayka
*/
public interface AttributeKvEntry extends KvEntry {
public interface AttributeKvEntry extends KvEntry, HasVersion {
long getLastUpdateTs();

View File

@ -29,9 +29,18 @@ public class BaseAttributeKvEntry implements AttributeKvEntry {
@Valid
private final KvEntry kv;
private final Long version;
public BaseAttributeKvEntry(KvEntry kv, long lastUpdateTs) {
this.kv = kv;
this.lastUpdateTs = lastUpdateTs;
this.version = null;
}
public BaseAttributeKvEntry(KvEntry kv, long lastUpdateTs, Long version) {
this.kv = kv;
this.lastUpdateTs = lastUpdateTs;
this.version = version;
}
public BaseAttributeKvEntry(long lastUpdateTs, KvEntry kv) {
@ -88,6 +97,11 @@ public class BaseAttributeKvEntry implements AttributeKvEntry {
return kv.getValue();
}
@Override
public long getVersion() {
return version;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -21,10 +21,17 @@ import lombok.Data;
@Data
@AllArgsConstructor
public class TbPair<S, T> {
public static final TbPair EMPTY = new TbPair<>(null, null);
private S first;
private T second;
public static <S, T> TbPair<S, T> of(S first, T second) {
return new TbPair<>(first, second);
}
@SuppressWarnings("unchecked")
public static <S, T> TbPair<S, T> emptyPair() {
return (TbPair<S, T>) EMPTY;
}
}

View File

@ -29,9 +29,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public abstract class AbstractVersionedInsertRepository<T> extends AbstractInsertRepository {
import static org.thingsboard.server.dao.model.ModelConstants.VERSION_COLUMN;
public static final String VERSION_COLUMN = "version";
public abstract class AbstractVersionedInsertRepository<T> extends AbstractInsertRepository {
public List<Long> saveOrUpdate(List<T> entities) {
return transactionTemplate.execute(status -> {
@ -51,7 +51,7 @@ public abstract class AbstractVersionedInsertRepository<T> extends AbstractInser
for (int i = 0; i < updateResult.length; i++) {
if (updateResult[i] == 0) {
insertEntities.add(entities.get(i));
seqNumbers.add(0L);
seqNumbers.add(null);
toInsertIndexes.add(i);
} else {
seqNumbers.add((Long) seqNumbersList.get(keyHolderIndex).get(VERSION_COLUMN));
@ -63,12 +63,14 @@ public abstract class AbstractVersionedInsertRepository<T> extends AbstractInser
return seqNumbers;
}
onInsertOrUpdate(insertEntities, keyHolder);
int[] insertResult = onInsertOrUpdate(insertEntities, keyHolder);
seqNumbersList = keyHolder.getKeyList();
for (int i = 0; i < seqNumbersList.size(); i++) {
seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(VERSION_COLUMN));
for (int i = 0; i < insertResult.length; i++) {
if (updateResult[i] != 0) {
seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(VERSION_COLUMN));
}
}
return seqNumbers;
@ -89,8 +91,8 @@ public abstract class AbstractVersionedInsertRepository<T> extends AbstractInser
}, keyHolder);
}
private void onInsertOrUpdate(List<T> insertEntities, KeyHolder keyHolder) {
jdbcTemplate.batchUpdate(new SequencePreparedStatementCreator(getInsertOrUpdateQuery()), new BatchPreparedStatementSetter() {
private int[] onInsertOrUpdate(List<T> insertEntities, KeyHolder keyHolder) {
return jdbcTemplate.batchUpdate(new SequencePreparedStatementCreator(getInsertOrUpdateQuery()), new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
setOnInsertOrUpdateValues(ps, i, insertEntities);

View File

@ -18,13 +18,13 @@ package org.thingsboard.server.dao.attributes;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;
import org.thingsboard.server.cache.CaffeineTbTransactionalCache;
import org.thingsboard.server.cache.VersionedCaffeineTbTransactionalCache;
import org.thingsboard.server.common.data.CacheConstants;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true)
@Service("AttributeCache")
public class AttributeCaffeineCache extends CaffeineTbTransactionalCache<AttributeCacheKey, AttributeKvEntry> {
public class AttributeCaffeineCache extends VersionedCaffeineTbTransactionalCache<AttributeCacheKey, AttributeKvEntry> {
public AttributeCaffeineCache(CacheManager cacheManager) {
super(cacheManager, CacheConstants.ATTRIBUTES_CACHE);

View File

@ -21,9 +21,9 @@ import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.SerializationException;
import org.springframework.stereotype.Service;
import org.thingsboard.server.cache.CacheSpecsMap;
import org.thingsboard.server.cache.RedisTbTransactionalCache;
import org.thingsboard.server.cache.TBRedisCacheConfiguration;
import org.thingsboard.server.cache.TbRedisSerializer;
import org.thingsboard.server.cache.VersionedRedisTbTransactionalCache;
import org.thingsboard.server.common.data.CacheConstants;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
@ -38,7 +38,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
@Service("AttributeCache")
public class AttributeRedisCache extends RedisTbTransactionalCache<AttributeCacheKey, AttributeKvEntry> {
public class AttributeRedisCache extends VersionedRedisTbTransactionalCache<AttributeCacheKey, AttributeKvEntry> {
public AttributeRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) {
super(CacheConstants.ATTRIBUTES_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() {
@ -82,26 +82,15 @@ public class AttributeRedisCache extends RedisTbTransactionalCache<AttributeCach
try {
AttributeValueProto proto = AttributeValueProto.parseFrom(bytes);
boolean hasValue = proto.getHasV();
KvEntry entry;
switch (proto.getType()) {
case BOOLEAN_V:
entry = new BooleanDataEntry(key.getKey(), hasValue ? proto.getBoolV() : null);
break;
case LONG_V:
entry = new LongDataEntry(key.getKey(), hasValue ? proto.getLongV() : null);
break;
case DOUBLE_V:
entry = new DoubleDataEntry(key.getKey(), hasValue ? proto.getDoubleV() : null);
break;
case STRING_V:
entry = new StringDataEntry(key.getKey(), hasValue ? proto.getStringV() : null);
break;
case JSON_V:
entry = new JsonDataEntry(key.getKey(), hasValue ? proto.getJsonV() : null);
break;
default:
throw new InvalidProtocolBufferException("Unrecognized type: " + proto.getType() + " !");
}
KvEntry entry = switch (proto.getType()) {
case BOOLEAN_V -> new BooleanDataEntry(key.getKey(), hasValue ? proto.getBoolV() : null);
case LONG_V -> new LongDataEntry(key.getKey(), hasValue ? proto.getLongV() : null);
case DOUBLE_V -> new DoubleDataEntry(key.getKey(), hasValue ? proto.getDoubleV() : null);
case STRING_V -> new StringDataEntry(key.getKey(), hasValue ? proto.getStringV() : null);
case JSON_V -> new JsonDataEntry(key.getKey(), hasValue ? proto.getJsonV() : null);
default ->
throw new InvalidProtocolBufferException("Unrecognized type: " + proto.getType() + " !");
};
return new BaseAttributeKvEntry(proto.getLastUpdateTs(), entry);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException(e.getMessage());

View File

@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.util.TbPair;
import java.util.Collection;
import java.util.List;
@ -42,6 +43,8 @@ public interface AttributesDao {
List<ListenableFuture<String>> removeAll(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List<String> keys);
List<ListenableFuture<TbPair<String, Long>>> removeAllWithVersions(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List<String> keys);
List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);
List<String> findAllKeysByEntityIds(TenantId tenantId, List<EntityId> entityIds);

View File

@ -27,13 +27,14 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.thingsboard.server.cache.TbCacheValueWrapper;
import org.thingsboard.server.cache.TbTransactionalCache;
import org.thingsboard.server.cache.VersionedTbTransactionalCache;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.stats.DefaultCounter;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.cache.CacheExecutorService;
@ -67,7 +68,7 @@ public class CachedAttributesService implements AttributesService {
private final CacheExecutorService cacheExecutorService;
private final DefaultCounter hitCounter;
private final DefaultCounter missCounter;
private final TbTransactionalCache<AttributeCacheKey, AttributeKvEntry> cache;
private final VersionedTbTransactionalCache<AttributeCacheKey, AttributeKvEntry> cache;
private ListeningExecutorService cacheExecutor;
@Value("${cache.type:caffeine}")
@ -79,7 +80,7 @@ public class CachedAttributesService implements AttributesService {
JpaExecutorService jpaExecutorService,
StatsFactory statsFactory,
CacheExecutorService cacheExecutorService,
TbTransactionalCache<AttributeCacheKey, AttributeKvEntry> cache) {
VersionedTbTransactionalCache<AttributeCacheKey, AttributeKvEntry> cache) {
this.attributesDao = attributesDao;
this.jpaExecutorService = jpaExecutorService;
this.cacheExecutorService = cacheExecutorService;
@ -122,17 +123,9 @@ public class CachedAttributesService implements AttributesService {
return Optional.ofNullable(cachedAttributeKvEntry);
} else {
missCounter.increment();
var cacheTransaction = cache.newTransactionForKey(attributeCacheKey);
try {
Optional<AttributeKvEntry> result = attributesDao.find(tenantId, entityId, scope, attributeKey);
cacheTransaction.putIfAbsent(attributeCacheKey, result.orElse(null));
cacheTransaction.commit();
return result;
} catch (Throwable e) {
cacheTransaction.rollback();
log.debug("Could not find attribute from cache: [{}] [{}] [{}]", entityId, scope, attributeKey, e);
throw e;
}
Optional<AttributeKvEntry> result = attributesDao.find(tenantId, entityId, scope, attributeKey);
cache.put(attributeCacheKey, result.orElse(null));
return result;
}
});
}
@ -163,28 +156,19 @@ public class CachedAttributesService implements AttributesService {
// DB call should run in DB executor, not in cache-related executor
return jpaExecutorService.submit(() -> {
var cacheTransaction = cache.newTransactionForKeys(notFoundKeys);
try {
log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys);
List<AttributeKvEntry> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys);
for (AttributeKvEntry foundInDbAttribute : result) {
AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, foundInDbAttribute.getKey());
cacheTransaction.putIfAbsent(attributeCacheKey, foundInDbAttribute);
notFoundAttributeKeys.remove(foundInDbAttribute.getKey());
}
for (String key : notFoundAttributeKeys) {
cacheTransaction.putIfAbsent(new AttributeCacheKey(scope, entityId, key), null);
}
List<AttributeKvEntry> mergedAttributes = new ArrayList<>(cachedAttributes);
mergedAttributes.addAll(result);
cacheTransaction.commit();
log.trace("[{}][{}] Commit cache transaction: {}", entityId, scope, notFoundAttributeKeys);
return mergedAttributes;
} catch (Throwable e) {
cacheTransaction.rollback();
log.debug("Could not find attributes from cache: [{}] [{}] [{}]", entityId, scope, notFoundAttributeKeys, e);
throw e;
log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys);
List<AttributeKvEntry> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys);
for (AttributeKvEntry foundInDbAttribute : result) {
put(entityId, scope, foundInDbAttribute, foundInDbAttribute.getVersion());
notFoundAttributeKeys.remove(foundInDbAttribute.getKey());
}
for (String key : notFoundAttributeKeys) {
cache.put(new AttributeCacheKey(scope, entityId, key), null);
}
List<AttributeKvEntry> mergedAttributes = new ArrayList<>(cachedAttributes);
mergedAttributes.addAll(result);
log.trace("[{}][{}] Commit cache transaction: {}", entityId, scope, notFoundAttributeKeys);
return mergedAttributes;
});
}, MoreExecutors.directExecutor()); // cacheExecutor analyse and returns results or submit to DB executor
@ -235,7 +219,7 @@ public class CachedAttributesService implements AttributesService {
validate(entityId, scope);
AttributeUtils.validate(attribute, valueNoXssValidation);
ListenableFuture<Long> future = attributesDao.save(tenantId, entityId, scope, attribute);
return Futures.transform(future, version -> evict(entityId, scope, attribute, version), cacheExecutor);
return Futures.transform(future, version -> put(entityId, scope, attribute, version), cacheExecutor);
}
@Override
@ -251,17 +235,17 @@ public class CachedAttributesService implements AttributesService {
List<ListenableFuture<Long>> futures = new ArrayList<>(attributes.size());
for (var attribute : attributes) {
ListenableFuture<Long> future = attributesDao.save(tenantId, entityId, scope, attribute);
futures.add(Futures.transform(future, version -> evict(entityId, scope, attribute, version), cacheExecutor));
futures.add(Futures.transform(future, version -> put(entityId, scope, attribute, version), cacheExecutor));
}
return Futures.allAsList(futures);
}
private Long evict(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute, Long version) {
private Long put(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute, Long version) {
String key = attribute.getKey();
log.trace("[{}][{}][{}] Before cache evict: {}", entityId, scope, key, attribute);
cache.evictOrPut(new AttributeCacheKey(scope, entityId, key), attribute);
log.trace("[{}][{}][{}] after cache evict.", entityId, scope, key);
log.trace("[{}][{}][{}] Before cache put: {}", entityId, scope, key, attribute);
cache.put(new AttributeCacheKey(scope, entityId, key), attribute, version);
log.trace("[{}][{}][{}] after cache put.", entityId, scope, key);
return version;
}
@ -273,9 +257,10 @@ public class CachedAttributesService implements AttributesService {
@Override
public ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> attributeKeys) {
validate(entityId, scope);
List<ListenableFuture<String>> futures = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys);
return Futures.allAsList(futures.stream().map(future -> Futures.transform(future, key -> {
cache.evict(new AttributeCacheKey(scope, entityId, key));
List<ListenableFuture<TbPair<String, Long>>> futures = attributesDao.removeAllWithVersions(tenantId, entityId, scope, attributeKeys);
return Futures.allAsList(futures.stream().map(future -> Futures.transform(future, keyVersionPair -> {
String key = keyVersionPair.getFirst();
cache.evict(new AttributeCacheKey(scope, entityId, key), keyVersionPair.getSecond());
return key;
}, cacheExecutor)).collect(Collectors.toList()));
}

View File

@ -0,0 +1,20 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.model;
public interface BaseVersionedEntity {
long getVersion();
}

View File

@ -56,6 +56,7 @@ public class ModelConstants {
public static final String ATTRIBUTE_TYPE_COLUMN = "attribute_type";
public static final String ATTRIBUTE_KEY_COLUMN = "attribute_key";
public static final String LAST_UPDATE_TS_COLUMN = "last_update_ts";
public static final String VERSION_COLUMN = "version";
/**
* User constants.

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.dao.model.sql;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
@ -41,9 +42,10 @@ import static org.thingsboard.server.dao.model.ModelConstants.LONG_VALUE_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUMN;
@Data
@EqualsAndHashCode(callSuper = true)
@Entity
@Table(name = "attribute_kv")
public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable {
public class AttributeKvEntity extends VersionedEntity implements ToData<AttributeKvEntry>, Serializable {
@EmbeddedId
private AttributeKvCompositeKey id;
@ -84,6 +86,6 @@ public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable
kvEntry = new JsonDataEntry(strKey, jsonValue);
}
return new BaseAttributeKvEntry(kvEntry, lastUpdateTs);
return new BaseAttributeKvEntry(kvEntry, lastUpdateTs, version);
}
}

View File

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.model.sql;
import jakarta.persistence.Column;
import jakarta.persistence.MappedSuperclass;
import lombok.Data;
import static org.thingsboard.server.dao.model.ModelConstants.VERSION_COLUMN;
@Data
@MappedSuperclass
public abstract class VersionedEntity {
@Column(name = VERSION_COLUMN)
protected Long version;
}

View File

@ -31,14 +31,14 @@ import java.util.List;
@SqlDao
public class AttributeKvInsertRepository extends AbstractVersionedInsertRepository<AttributeKvEntity> {
private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, version = version + 1 " +
private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, version = nextval('attribute_kv_version_seq') " +
"WHERE entity_id = ? and attribute_type =? and attribute_key = ? RETURNING version;";
private static final String INSERT_OR_UPDATE =
"INSERT INTO attribute_kv (entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts) " +
"VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?) " +
"INSERT INTO attribute_kv (entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts, version) " +
"VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?, nextval('attribute_kv_version_seq')) " +
"ON CONFLICT (entity_id, attribute_type, attribute_key) " +
"DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, version = attribute_kv.version + 1 RETURNING version;";
"DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, version = nextval('attribute_kv_version_seq') RETURNING version;";
@Override
protected void setOnBatchUpdateValues(PreparedStatement ps, int i, List<AttributeKvEntity> entities) throws SQLException {

View File

@ -30,15 +30,15 @@ public interface AttributeKvRepository extends JpaRepository<AttributeKvEntity,
@Query("SELECT a FROM AttributeKvEntity a WHERE a.id.entityId = :entityId " +
"AND a.id.attributeType = :attributeType")
List<AttributeKvEntity> findAllEntityIdAndAttributeType(@Param("entityId") UUID entityId,
@Param("attributeType") int attributeType);
List<AttributeKvEntity> findAllByEntityIdAndAttributeType(@Param("entityId") UUID entityId,
@Param("attributeType") int attributeType);
@Transactional
@Modifying
@Query("DELETE FROM AttributeKvEntity a WHERE a.id.entityId = :entityId " +
"AND a.id.attributeType = :attributeType " +
"AND a.id.attributeKey = :attributeKey")
void delete(@Param("entityId") UUID entityId,
@Query(value = "DELETE FROM attribute_kv WHERE entity_id = :entityId " +
"AND attribute_type = :attributeType " +
"AND attribute_key = :attributeKey RETURNING version", nativeQuery = true)
Long delete(@Param("entityId") UUID entityId,
@Param("attributeType") int attributeType,
@Param("attributeKey") int attributeKey);

View File

@ -30,6 +30,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.attributes.AttributesDao;
@ -144,7 +145,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
@Override
public List<AttributeKvEntry> findAll(TenantId tenantId, EntityId entityId, AttributeScope attributeScope) {
List<AttributeKvEntity> attributes = attributeKvRepository.findAllEntityIdAndAttributeType(
List<AttributeKvEntity> attributes = attributeKvRepository.findAllByEntityIdAndAttributeType(
entityId.getId(),
attributeScope.getId());
attributes.forEach(attributeKvEntity -> attributeKvEntity.setStrKey(keyDictionaryDao.getKey(attributeKvEntity.getId().getAttributeKey())));
@ -205,6 +206,18 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
return futuresList;
}
@Override
public List<ListenableFuture<TbPair<String, Long>>> removeAllWithVersions(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List<String> keys) {
List<ListenableFuture<TbPair<String, Long>>> futuresList = new ArrayList<>(keys.size());
for (String key : keys) {
futuresList.add(service.submit(() -> {
Long version = attributeKvRepository.delete(entityId.getId(), attributeScope.getId(), keyDictionaryDao.getOrSaveKeyId(key));
return TbPair.of(key, version);
}));
}
return futuresList;
}
@Transactional
@Override
public List<Pair<AttributeScope, String>> removeAllByEntityId(TenantId tenantId, EntityId entityId) {

View File

@ -40,11 +40,11 @@ public class SqlLatestInsertTsRepository extends AbstractVersionedInsertReposito
private Boolean updateByLatestTs;
private static final String BATCH_UPDATE =
"UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), version = version + 1 WHERE entity_id = ? AND key = ?";
"UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), version = nextval('ts_kv_latest_version_seq') WHERE entity_id = ? AND key = ?";
private static final String INSERT_OR_UPDATE =
"INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " +
"ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), version = ts_kv_latest.version + 1";
"INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v, version) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), nextval('ts_kv_latest_version_seq')) " +
"ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), version = nextval('ts_kv_latest_version_seq')";
private static final String BATCH_UPDATE_BY_LATEST_TS = BATCH_UPDATE + " AND ts_kv_latest.ts <= ?";

View File

@ -102,6 +102,8 @@ CREATE TABLE IF NOT EXISTS audit_log (
action_failure_details varchar(1000000)
) PARTITION BY RANGE (created_time);
CREATE SEQUENCE IF NOT EXISTS attribute_kv_version_seq cache 1000;
CREATE TABLE IF NOT EXISTS attribute_kv (
entity_id uuid,
attribute_type int,
@ -112,7 +114,7 @@ CREATE TABLE IF NOT EXISTS attribute_kv (
dbl_v double precision,
json_v json,
last_update_ts bigint,
version bigint default 0,
version bigint default 1,
CONSTRAINT attribute_kv_pkey PRIMARY KEY (entity_id, attribute_type, attribute_key)
);
@ -539,6 +541,8 @@ CREATE TABLE IF NOT EXISTS entity_view (
CONSTRAINT entity_view_external_id_unq_key UNIQUE (tenant_id, external_id)
);
CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_version_seq cache 1000;
CREATE TABLE IF NOT EXISTS ts_kv_latest
(
entity_id uuid NOT NULL,
@ -549,7 +553,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest
long_v bigint,
dbl_v double precision,
json_v json,
version bigint default 0,
version bigint default 1,
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
);

View File

@ -34,6 +34,8 @@ CREATE TABLE IF NOT EXISTS key_dictionary (
CONSTRAINT key_dictionary_id_pkey PRIMARY KEY (key)
);
CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_version_seq cache 1000;
CREATE TABLE IF NOT EXISTS ts_kv_latest (
entity_id uuid NOT NULL,
key int NOT NULL,
@ -43,7 +45,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest (
long_v bigint,
dbl_v double precision,
json_v json,
version bigint default 0,
version bigint default 1,
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
);

View File

@ -14,6 +14,8 @@
-- limitations under the License.
--
CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_version_seq cache 1000;
CREATE TABLE IF NOT EXISTS ts_kv_latest
(
entity_id uuid NOT NULL,
@ -24,6 +26,6 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest
long_v bigint,
dbl_v double precision,
json_v json,
version bigint default 0,
version bigint default 1,
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
);
);

View File

@ -23,6 +23,7 @@ DROP TABLE IF EXISTS alarm_type;
DROP TABLE IF EXISTS asset;
DROP TABLE IF EXISTS audit_log;
DROP TABLE IF EXISTS attribute_kv;
DROP SEQUENCE IF EXISTS attribute_kv_version_seq;
DROP TABLE IF EXISTS component_descriptor;
DROP TABLE IF EXISTS customer;
DROP TABLE IF EXISTS device;
@ -36,6 +37,7 @@ DROP TABLE IF EXISTS relation;
DROP TABLE IF EXISTS tenant;
DROP TABLE IF EXISTS ts_kv;
DROP TABLE IF EXISTS ts_kv_latest;
DROP SEQUENCE IF EXISTS ts_kv_latest_version_seq;
DROP TABLE IF EXISTS ts_kv_dictionary;
DROP TABLE IF EXISTS user_credentials;
DROP TABLE IF EXISTS widgets_bundle_widget;