From 118407d982cea7e2cfff7937511560cf96b8736a Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 26 Jun 2024 15:51:32 +0200 Subject: [PATCH] implemented versioned cache --- .../cache/RedisTbTransactionalCache.java | 15 +- ...VersionedCaffeineTbTransactionalCache.java | 91 ++++++++++ .../VersionedRedisTbTransactionalCache.java | 163 ++++++++++++++++++ .../cache/VersionedTbTransactionalCache.java | 33 ++++ .../server/common/data/HasVersion.java | 20 +++ .../common/data/kv/AttributeKvEntry.java | 4 +- .../common/data/kv/BaseAttributeKvEntry.java | 14 ++ .../server/common/data/util/TbPair.java | 7 + .../AbstractVersionedInsertRepository.java | 18 +- .../attributes/AttributeCaffeineCache.java | 4 +- .../dao/attributes/AttributeRedisCache.java | 33 ++-- .../server/dao/attributes/AttributesDao.java | 3 + .../attributes/CachedAttributesService.java | 73 ++++---- .../server/dao/model/BaseVersionedEntity.java | 20 +++ .../server/dao/model/ModelConstants.java | 1 + .../dao/model/sql/AttributeKvEntity.java | 6 +- .../server/dao/model/sql/VersionedEntity.java | 30 ++++ .../AttributeKvInsertRepository.java | 8 +- .../sql/attributes/AttributeKvRepository.java | 12 +- .../dao/sql/attributes/JpaAttributeDao.java | 15 +- .../sql/SqlLatestInsertTsRepository.java | 6 +- .../main/resources/sql/schema-entities.sql | 8 +- .../main/resources/sql/schema-timescale.sql | 4 +- .../resources/sql/schema-ts-latest-psql.sql | 6 +- .../resources/sql/psql/drop-all-tables.sql | 2 + 25 files changed, 492 insertions(+), 104 deletions(-) create mode 100644 common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbTransactionalCache.java create mode 100644 common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbTransactionalCache.java create mode 100644 common/cache/src/main/java/org/thingsboard/server/cache/VersionedTbTransactionalCache.java create mode 100644 common/data/src/main/java/org/thingsboard/server/common/data/HasVersion.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/model/BaseVersionedEntity.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/model/sql/VersionedEntity.java diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java index abfbb398c9..543739aad4 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java @@ -29,7 +29,6 @@ import org.springframework.data.redis.core.types.Expiration; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.thingsboard.server.common.data.FstStatsService; -import redis.clients.jedis.Connection; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.util.JedisClusterCRC16; @@ -44,7 +43,7 @@ import java.util.concurrent.TimeUnit; @Slf4j public abstract class RedisTbTransactionalCache implements TbTransactionalCache { - private static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE); + static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE); static final JedisPool MOCK_POOL = new JedisPool(); //non-null pool required for JedisConnection to trigger closing jedis connection @Autowired @@ -79,7 +78,7 @@ public abstract class RedisTbTransactionalCache get(K key) { try (var connection = connectionFactory.getConnection()) { byte[] rawKey = getRawKey(key); - byte[] rawValue = connection.get(rawKey); + byte[] rawValue = doGet(connection, rawKey); if (rawValue == null) { return null; } else if (Arrays.equals(rawValue, BINARY_NULL_VALUE)) { @@ -96,6 +95,10 @@ public abstract class RedisTbTransactionalCache(this, connection); } - private RedisConnection getConnection(byte[] rawKey) { + protected RedisConnection getConnection(byte[] rawKey) { if (!connectionFactory.isRedisClusterAware()) { return connectionFactory.getConnection(); } @@ -180,7 +183,7 @@ public abstract class RedisTbTransactionalCache implements VersionedTbTransactionalCache { + + private final CacheManager cacheManager; + private final String cacheName; + + private final Lock lock = new ReentrantLock(); + + @Override + public TbCacheValueWrapper get(K key) { + return SimpleTbCacheValueWrapper.wrap(doGet(key).getSecond()); + } + + @Override + public void put(K key, V value) { + Long version = value != null ? value.getVersion() : 0; + put(key, value, version); + } + + @Override + public void put(K key, V value, Long version) { + if (version == null) { + return; + } + lock.lock(); + try { + TbPair versionValuePair = doGet(key); + Long currentVersion = versionValuePair.getFirst(); + if (currentVersion == null || version >= currentVersion) { + cacheManager.getCache(cacheName).put(key, TbPair.of(version, value)); + } + } finally { + lock.unlock(); + } + } + + private TbPair doGet(K key) { + Cache.ValueWrapper source = cacheManager.getCache(cacheName).get(key); + return source == null ? TbPair.emptyPair() : (TbPair) source.get(); + } + + @Override + public void evict(K key) { + lock.lock(); + try { + cacheManager.getCache(cacheName).evict(key); + } finally { + lock.unlock(); + } + } + + @Override + public void evict(K key, Long version) { + if (version == null) { + return; + } + lock.lock(); + try { + put(key, null, version); + } finally { + lock.unlock(); + } + } +} diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbTransactionalCache.java new file mode 100644 index 0000000000..34b46d535a --- /dev/null +++ b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbTransactionalCache.java @@ -0,0 +1,163 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.cache; + +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.NotImplementedException; +import org.springframework.dao.InvalidDataAccessApiUsageException; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.ReturnType; +import org.springframework.data.redis.serializer.StringRedisSerializer; +import org.thingsboard.server.common.data.HasVersion; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +@Slf4j +public abstract class VersionedRedisTbTransactionalCache extends RedisTbTransactionalCache implements VersionedTbTransactionalCache { + + private static final int VERSION_SIZE = 8; + private static final int VALUE_END_OFFSET = -1; + + static final byte[] SET_VERSIONED_VALUE_LUA_SCRIPT = StringRedisSerializer.UTF_8.serialize(""" + -- KEYS[1] is the key + -- ARGV[1] is the new value + -- ARGV[2] is the new version + + local key = KEYS[1] + local newValue = ARGV[1] + local newVersion = tonumber(ARGV[2]) + + -- Function to set the new value with the version + local function setNewValue() + local newValueWithVersion = struct.pack(">I8", newVersion) .. newValue:sub(9) + redis.call('SET', key, newValueWithVersion) + end + + -- Get the current version (first 8 bytes) of the current value + local currentVersionBytes = redis.call('GETRANGE', key, 0, 7) + + if currentVersionBytes and #currentVersionBytes == 8 then + -- Extract the current version from the first 8 bytes + local currentVersion = tonumber(struct.unpack(">I8", currentVersionBytes)) + + if newVersion >= currentVersion then + setNewValue() + end + else + -- If the current value is absent or the current version is not found, set the new value + setNewValue() + end + """); + static final byte[] SET_VERSIONED_VALUE_SHA = StringRedisSerializer.UTF_8.serialize("041b109dd56f6c8afb55090076e754727a5d3da0"); + + public VersionedRedisTbTransactionalCache(String cacheName, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory, TBRedisCacheConfiguration configuration, TbRedisSerializer valueSerializer) { + super(cacheName, cacheSpecsMap, connectionFactory, configuration, valueSerializer); + } + + @PostConstruct + public void init() { + try (var connection = getConnection(SET_VERSIONED_VALUE_SHA)) { + log.debug("Loading LUA with expected SHA[{}], connection [{}]", new String(SET_VERSIONED_VALUE_SHA), connection.getNativeConnection()); + String sha = connection.scriptingCommands().scriptLoad(SET_VERSIONED_VALUE_LUA_SCRIPT); + if (!Arrays.equals(SET_VERSIONED_VALUE_SHA, StringRedisSerializer.UTF_8.serialize(sha))) { + log.error("SHA for SET_VERSIONED_VALUE_LUA_SCRIPT wrong! Expected [{}], but actual [{}], connection [{}]", new String(SET_VERSIONED_VALUE_SHA), sha, connection.getNativeConnection()); + } + } catch (Throwable t) { + log.error("Error on Redis versioned cache init", t); + } + } + + @Override + protected byte[] doGet(RedisConnection connection, byte[] rawKey) { + return connection.stringCommands().getRange(rawKey, VERSION_SIZE, VALUE_END_OFFSET); + } + + @Override + public void put(K key, V value) { + Long version = value!= null ? value.getVersion() : 0; + put(key, value, version); + } + + @Override + public void put(K key, V value, Long version) { + //TODO: use expiration + log.trace("put [{}][{}][{}]", key, value, version); + if (version == null) { + return; + } + final byte[] rawKey = getRawKey(key); + try (var connection = getConnection(rawKey)) { + byte[] rawValue = getRawValue(value); + byte[] rawVersion = StringRedisSerializer.UTF_8.serialize(String.valueOf(version)); + try { + connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion); + } catch (InvalidDataAccessApiUsageException e) { + log.debug("loading LUA [{}]", connection.getNativeConnection()); + String sha = connection.scriptingCommands().scriptLoad(SET_VERSIONED_VALUE_LUA_SCRIPT); + if (!Arrays.equals(SET_VERSIONED_VALUE_SHA, StringRedisSerializer.UTF_8.serialize(sha))) { + log.error("SHA for SET_VERSIONED_VALUE_LUA_SCRIPT wrong! Expected [{}], but actual [{}]", new String(SET_VERSIONED_VALUE_SHA), sha); + } + try { + connection.scriptingCommands().evalSha(SET_VERSIONED_VALUE_SHA, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion); + } catch (InvalidDataAccessApiUsageException ignored) { + log.debug("Slowly executing eval instead of fast evalsha"); + connection.scriptingCommands().eval(SET_VERSIONED_VALUE_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, rawValue, rawVersion); + } + } + } + } + + @Override + public void evict(K key, Long version) { + log.trace("evict [{}][{}]", key, version); + if (version != null) { + //TODO: use evict expiration + put(key, null, version); + } + } + + @Override + public void putIfAbsent(K key, V value) { + log.trace("putIfAbsent [{}][{}]", key, value); + throw new NotImplementedException("putIfAbsent is not supported by versioned cache"); + } + + @Override + public void evict(Collection keys) { + throw new NotImplementedException("evict by many keys is not supported by versioned cache"); + } + + @Override + public void evictOrPut(K key, V value) { + throw new NotImplementedException("evictOrPut is not supported by versioned cache"); + } + + @Override + public TbCacheTransaction newTransactionForKey(K key) { + throw new NotImplementedException("newTransactionForKey is not supported by versioned cache"); + } + + @Override + public TbCacheTransaction newTransactionForKeys(List keys) { + throw new NotImplementedException("newTransactionForKeys is not supported by versioned cache"); + } + +} diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedTbTransactionalCache.java new file mode 100644 index 0000000000..72f4a8c466 --- /dev/null +++ b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedTbTransactionalCache.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.cache; + +import org.thingsboard.server.common.data.HasVersion; + +import java.io.Serializable; + +public interface VersionedTbTransactionalCache { + + TbCacheValueWrapper get(K key); + + void put(K key, V value); + + void put(K key, V value, Long version); + + void evict(K key); + + void evict(K key, Long version); +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/HasVersion.java b/common/data/src/main/java/org/thingsboard/server/common/data/HasVersion.java new file mode 100644 index 0000000000..fbeb8ef3b6 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/HasVersion.java @@ -0,0 +1,20 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data; + +public interface HasVersion { + long getVersion(); +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/AttributeKvEntry.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/AttributeKvEntry.java index 19057fb1aa..c63c953170 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/AttributeKvEntry.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/AttributeKvEntry.java @@ -15,10 +15,12 @@ */ package org.thingsboard.server.common.data.kv; +import org.thingsboard.server.common.data.HasVersion; + /** * @author Andrew Shvayka */ -public interface AttributeKvEntry extends KvEntry { +public interface AttributeKvEntry extends KvEntry, HasVersion { long getLastUpdateTs(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseAttributeKvEntry.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseAttributeKvEntry.java index ced1486c14..6d48693511 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseAttributeKvEntry.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseAttributeKvEntry.java @@ -29,9 +29,18 @@ public class BaseAttributeKvEntry implements AttributeKvEntry { @Valid private final KvEntry kv; + private final Long version; + public BaseAttributeKvEntry(KvEntry kv, long lastUpdateTs) { this.kv = kv; this.lastUpdateTs = lastUpdateTs; + this.version = null; + } + + public BaseAttributeKvEntry(KvEntry kv, long lastUpdateTs, Long version) { + this.kv = kv; + this.lastUpdateTs = lastUpdateTs; + this.version = version; } public BaseAttributeKvEntry(long lastUpdateTs, KvEntry kv) { @@ -88,6 +97,11 @@ public class BaseAttributeKvEntry implements AttributeKvEntry { return kv.getValue(); } + @Override + public long getVersion() { + return version; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/util/TbPair.java b/common/data/src/main/java/org/thingsboard/server/common/data/util/TbPair.java index 6e8b2e5696..315131bdad 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/util/TbPair.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/util/TbPair.java @@ -21,10 +21,17 @@ import lombok.Data; @Data @AllArgsConstructor public class TbPair { + public static final TbPair EMPTY = new TbPair<>(null, null); + private S first; private T second; public static TbPair of(S first, T second) { return new TbPair<>(first, second); } + + @SuppressWarnings("unchecked") + public static TbPair emptyPair() { + return (TbPair) EMPTY; + } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/AbstractVersionedInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/AbstractVersionedInsertRepository.java index cb92ca5e1f..6380957f77 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/AbstractVersionedInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/AbstractVersionedInsertRepository.java @@ -29,9 +29,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -public abstract class AbstractVersionedInsertRepository extends AbstractInsertRepository { +import static org.thingsboard.server.dao.model.ModelConstants.VERSION_COLUMN; - public static final String VERSION_COLUMN = "version"; +public abstract class AbstractVersionedInsertRepository extends AbstractInsertRepository { public List saveOrUpdate(List entities) { return transactionTemplate.execute(status -> { @@ -51,7 +51,7 @@ public abstract class AbstractVersionedInsertRepository extends AbstractInser for (int i = 0; i < updateResult.length; i++) { if (updateResult[i] == 0) { insertEntities.add(entities.get(i)); - seqNumbers.add(0L); + seqNumbers.add(null); toInsertIndexes.add(i); } else { seqNumbers.add((Long) seqNumbersList.get(keyHolderIndex).get(VERSION_COLUMN)); @@ -63,12 +63,14 @@ public abstract class AbstractVersionedInsertRepository extends AbstractInser return seqNumbers; } - onInsertOrUpdate(insertEntities, keyHolder); + int[] insertResult = onInsertOrUpdate(insertEntities, keyHolder); seqNumbersList = keyHolder.getKeyList(); - for (int i = 0; i < seqNumbersList.size(); i++) { - seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(VERSION_COLUMN)); + for (int i = 0; i < insertResult.length; i++) { + if (updateResult[i] != 0) { + seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(VERSION_COLUMN)); + } } return seqNumbers; @@ -89,8 +91,8 @@ public abstract class AbstractVersionedInsertRepository extends AbstractInser }, keyHolder); } - private void onInsertOrUpdate(List insertEntities, KeyHolder keyHolder) { - jdbcTemplate.batchUpdate(new SequencePreparedStatementCreator(getInsertOrUpdateQuery()), new BatchPreparedStatementSetter() { + private int[] onInsertOrUpdate(List insertEntities, KeyHolder keyHolder) { + return jdbcTemplate.batchUpdate(new SequencePreparedStatementCreator(getInsertOrUpdateQuery()), new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { setOnInsertOrUpdateValues(ps, i, insertEntities); diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java index 7df55a33fb..e00ea8d11c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeCaffeineCache.java @@ -18,13 +18,13 @@ package org.thingsboard.server.dao.attributes; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cache.CacheManager; import org.springframework.stereotype.Service; -import org.thingsboard.server.cache.CaffeineTbTransactionalCache; +import org.thingsboard.server.cache.VersionedCaffeineTbTransactionalCache; import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.kv.AttributeKvEntry; @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true) @Service("AttributeCache") -public class AttributeCaffeineCache extends CaffeineTbTransactionalCache { +public class AttributeCaffeineCache extends VersionedCaffeineTbTransactionalCache { public AttributeCaffeineCache(CacheManager cacheManager) { super(cacheManager, CacheConstants.ATTRIBUTES_CACHE); diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java index 7932a8b8d4..68caf62ceb 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java @@ -21,9 +21,9 @@ import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.serializer.SerializationException; import org.springframework.stereotype.Service; import org.thingsboard.server.cache.CacheSpecsMap; -import org.thingsboard.server.cache.RedisTbTransactionalCache; import org.thingsboard.server.cache.TBRedisCacheConfiguration; import org.thingsboard.server.cache.TbRedisSerializer; +import org.thingsboard.server.cache.VersionedRedisTbTransactionalCache; import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; @@ -38,7 +38,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType; @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") @Service("AttributeCache") -public class AttributeRedisCache extends RedisTbTransactionalCache { +public class AttributeRedisCache extends VersionedRedisTbTransactionalCache { public AttributeRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) { super(CacheConstants.ATTRIBUTES_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() { @@ -82,26 +82,15 @@ public class AttributeRedisCache extends RedisTbTransactionalCache new BooleanDataEntry(key.getKey(), hasValue ? proto.getBoolV() : null); + case LONG_V -> new LongDataEntry(key.getKey(), hasValue ? proto.getLongV() : null); + case DOUBLE_V -> new DoubleDataEntry(key.getKey(), hasValue ? proto.getDoubleV() : null); + case STRING_V -> new StringDataEntry(key.getKey(), hasValue ? proto.getStringV() : null); + case JSON_V -> new JsonDataEntry(key.getKey(), hasValue ? proto.getJsonV() : null); + default -> + throw new InvalidProtocolBufferException("Unrecognized type: " + proto.getType() + " !"); + }; return new BaseAttributeKvEntry(proto.getLastUpdateTs(), entry); } catch (InvalidProtocolBufferException e) { throw new SerializationException(e.getMessage()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java index 34f67fb5ff..1805f72a8f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.util.TbPair; import java.util.Collection; import java.util.List; @@ -42,6 +43,8 @@ public interface AttributesDao { List> removeAll(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List keys); + List>> removeAllWithVersions(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List keys); + List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); List findAllKeysByEntityIds(TenantId tenantId, List entityIds); diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index 375b56b1f6..fb5c745a11 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -27,13 +27,14 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import org.thingsboard.server.cache.TbCacheValueWrapper; -import org.thingsboard.server.cache.TbTransactionalCache; +import org.thingsboard.server.cache.VersionedTbTransactionalCache; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.stats.DefaultCounter; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.cache.CacheExecutorService; @@ -67,7 +68,7 @@ public class CachedAttributesService implements AttributesService { private final CacheExecutorService cacheExecutorService; private final DefaultCounter hitCounter; private final DefaultCounter missCounter; - private final TbTransactionalCache cache; + private final VersionedTbTransactionalCache cache; private ListeningExecutorService cacheExecutor; @Value("${cache.type:caffeine}") @@ -79,7 +80,7 @@ public class CachedAttributesService implements AttributesService { JpaExecutorService jpaExecutorService, StatsFactory statsFactory, CacheExecutorService cacheExecutorService, - TbTransactionalCache cache) { + VersionedTbTransactionalCache cache) { this.attributesDao = attributesDao; this.jpaExecutorService = jpaExecutorService; this.cacheExecutorService = cacheExecutorService; @@ -122,17 +123,9 @@ public class CachedAttributesService implements AttributesService { return Optional.ofNullable(cachedAttributeKvEntry); } else { missCounter.increment(); - var cacheTransaction = cache.newTransactionForKey(attributeCacheKey); - try { - Optional result = attributesDao.find(tenantId, entityId, scope, attributeKey); - cacheTransaction.putIfAbsent(attributeCacheKey, result.orElse(null)); - cacheTransaction.commit(); - return result; - } catch (Throwable e) { - cacheTransaction.rollback(); - log.debug("Could not find attribute from cache: [{}] [{}] [{}]", entityId, scope, attributeKey, e); - throw e; - } + Optional result = attributesDao.find(tenantId, entityId, scope, attributeKey); + cache.put(attributeCacheKey, result.orElse(null)); + return result; } }); } @@ -163,28 +156,19 @@ public class CachedAttributesService implements AttributesService { // DB call should run in DB executor, not in cache-related executor return jpaExecutorService.submit(() -> { - var cacheTransaction = cache.newTransactionForKeys(notFoundKeys); - try { - log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys); - List result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys); - for (AttributeKvEntry foundInDbAttribute : result) { - AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, foundInDbAttribute.getKey()); - cacheTransaction.putIfAbsent(attributeCacheKey, foundInDbAttribute); - notFoundAttributeKeys.remove(foundInDbAttribute.getKey()); - } - for (String key : notFoundAttributeKeys) { - cacheTransaction.putIfAbsent(new AttributeCacheKey(scope, entityId, key), null); - } - List mergedAttributes = new ArrayList<>(cachedAttributes); - mergedAttributes.addAll(result); - cacheTransaction.commit(); - log.trace("[{}][{}] Commit cache transaction: {}", entityId, scope, notFoundAttributeKeys); - return mergedAttributes; - } catch (Throwable e) { - cacheTransaction.rollback(); - log.debug("Could not find attributes from cache: [{}] [{}] [{}]", entityId, scope, notFoundAttributeKeys, e); - throw e; + log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys); + List result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys); + for (AttributeKvEntry foundInDbAttribute : result) { + put(entityId, scope, foundInDbAttribute, foundInDbAttribute.getVersion()); + notFoundAttributeKeys.remove(foundInDbAttribute.getKey()); } + for (String key : notFoundAttributeKeys) { + cache.put(new AttributeCacheKey(scope, entityId, key), null); + } + List mergedAttributes = new ArrayList<>(cachedAttributes); + mergedAttributes.addAll(result); + log.trace("[{}][{}] Commit cache transaction: {}", entityId, scope, notFoundAttributeKeys); + return mergedAttributes; }); }, MoreExecutors.directExecutor()); // cacheExecutor analyse and returns results or submit to DB executor @@ -235,7 +219,7 @@ public class CachedAttributesService implements AttributesService { validate(entityId, scope); AttributeUtils.validate(attribute, valueNoXssValidation); ListenableFuture future = attributesDao.save(tenantId, entityId, scope, attribute); - return Futures.transform(future, version -> evict(entityId, scope, attribute, version), cacheExecutor); + return Futures.transform(future, version -> put(entityId, scope, attribute, version), cacheExecutor); } @Override @@ -251,17 +235,17 @@ public class CachedAttributesService implements AttributesService { List> futures = new ArrayList<>(attributes.size()); for (var attribute : attributes) { ListenableFuture future = attributesDao.save(tenantId, entityId, scope, attribute); - futures.add(Futures.transform(future, version -> evict(entityId, scope, attribute, version), cacheExecutor)); + futures.add(Futures.transform(future, version -> put(entityId, scope, attribute, version), cacheExecutor)); } return Futures.allAsList(futures); } - private Long evict(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute, Long version) { + private Long put(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute, Long version) { String key = attribute.getKey(); - log.trace("[{}][{}][{}] Before cache evict: {}", entityId, scope, key, attribute); - cache.evictOrPut(new AttributeCacheKey(scope, entityId, key), attribute); - log.trace("[{}][{}][{}] after cache evict.", entityId, scope, key); + log.trace("[{}][{}][{}] Before cache put: {}", entityId, scope, key, attribute); + cache.put(new AttributeCacheKey(scope, entityId, key), attribute, version); + log.trace("[{}][{}][{}] after cache put.", entityId, scope, key); return version; } @@ -273,9 +257,10 @@ public class CachedAttributesService implements AttributesService { @Override public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributeKeys) { validate(entityId, scope); - List> futures = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys); - return Futures.allAsList(futures.stream().map(future -> Futures.transform(future, key -> { - cache.evict(new AttributeCacheKey(scope, entityId, key)); + List>> futures = attributesDao.removeAllWithVersions(tenantId, entityId, scope, attributeKeys); + return Futures.allAsList(futures.stream().map(future -> Futures.transform(future, keyVersionPair -> { + String key = keyVersionPair.getFirst(); + cache.evict(new AttributeCacheKey(scope, entityId, key), keyVersionPair.getSecond()); return key; }, cacheExecutor)).collect(Collectors.toList())); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/BaseVersionedEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/BaseVersionedEntity.java new file mode 100644 index 0000000000..6b98348de9 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/model/BaseVersionedEntity.java @@ -0,0 +1,20 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.model; + +public interface BaseVersionedEntity { + long getVersion(); +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java index fb77ad6987..643c099829 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java @@ -56,6 +56,7 @@ public class ModelConstants { public static final String ATTRIBUTE_TYPE_COLUMN = "attribute_type"; public static final String ATTRIBUTE_KEY_COLUMN = "attribute_key"; public static final String LAST_UPDATE_TS_COLUMN = "last_update_ts"; + public static final String VERSION_COLUMN = "version"; /** * User constants. diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java index 03aa0f98ad..d7bd23db4d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AttributeKvEntity.java @@ -16,6 +16,7 @@ package org.thingsboard.server.dao.model.sql; import lombok.Data; +import lombok.EqualsAndHashCode; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; @@ -41,9 +42,10 @@ import static org.thingsboard.server.dao.model.ModelConstants.LONG_VALUE_COLUMN; import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUMN; @Data +@EqualsAndHashCode(callSuper = true) @Entity @Table(name = "attribute_kv") -public class AttributeKvEntity implements ToData, Serializable { +public class AttributeKvEntity extends VersionedEntity implements ToData, Serializable { @EmbeddedId private AttributeKvCompositeKey id; @@ -84,6 +86,6 @@ public class AttributeKvEntity implements ToData, Serializable kvEntry = new JsonDataEntry(strKey, jsonValue); } - return new BaseAttributeKvEntry(kvEntry, lastUpdateTs); + return new BaseAttributeKvEntry(kvEntry, lastUpdateTs, version); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/VersionedEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/VersionedEntity.java new file mode 100644 index 0000000000..2dfee6466a --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/VersionedEntity.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.model.sql; + +import jakarta.persistence.Column; +import jakarta.persistence.MappedSuperclass; +import lombok.Data; + +import static org.thingsboard.server.dao.model.ModelConstants.VERSION_COLUMN; + +@Data +@MappedSuperclass +public abstract class VersionedEntity { + + @Column(name = VERSION_COLUMN) + protected Long version; +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java index c2b9d53d47..7050e61144 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java @@ -31,14 +31,14 @@ import java.util.List; @SqlDao public class AttributeKvInsertRepository extends AbstractVersionedInsertRepository { - private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, version = version + 1 " + + private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, version = nextval('attribute_kv_version_seq') " + "WHERE entity_id = ? and attribute_type =? and attribute_key = ? RETURNING version;"; private static final String INSERT_OR_UPDATE = - "INSERT INTO attribute_kv (entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts) " + - "VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?) " + + "INSERT INTO attribute_kv (entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts, version) " + + "VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?, nextval('attribute_kv_version_seq')) " + "ON CONFLICT (entity_id, attribute_type, attribute_key) " + - "DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, version = attribute_kv.version + 1 RETURNING version;"; + "DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, version = nextval('attribute_kv_version_seq') RETURNING version;"; @Override protected void setOnBatchUpdateValues(PreparedStatement ps, int i, List entities) throws SQLException { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java index ddd6c3c881..beea5f2f2f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java @@ -30,15 +30,15 @@ public interface AttributeKvRepository extends JpaRepository findAllEntityIdAndAttributeType(@Param("entityId") UUID entityId, - @Param("attributeType") int attributeType); + List findAllByEntityIdAndAttributeType(@Param("entityId") UUID entityId, + @Param("attributeType") int attributeType); @Transactional @Modifying - @Query("DELETE FROM AttributeKvEntity a WHERE a.id.entityId = :entityId " + - "AND a.id.attributeType = :attributeType " + - "AND a.id.attributeKey = :attributeKey") - void delete(@Param("entityId") UUID entityId, + @Query(value = "DELETE FROM attribute_kv WHERE entity_id = :entityId " + + "AND attribute_type = :attributeType " + + "AND attribute_key = :attributeKey RETURNING version", nativeQuery = true) + Long delete(@Param("entityId") UUID entityId, @Param("attributeType") int attributeType, @Param("attributeKey") int attributeKey); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java index 05cdfce221..de4487381b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java @@ -30,6 +30,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.attributes.AttributesDao; @@ -144,7 +145,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl @Override public List findAll(TenantId tenantId, EntityId entityId, AttributeScope attributeScope) { - List attributes = attributeKvRepository.findAllEntityIdAndAttributeType( + List attributes = attributeKvRepository.findAllByEntityIdAndAttributeType( entityId.getId(), attributeScope.getId()); attributes.forEach(attributeKvEntity -> attributeKvEntity.setStrKey(keyDictionaryDao.getKey(attributeKvEntity.getId().getAttributeKey()))); @@ -205,6 +206,18 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl return futuresList; } + @Override + public List>> removeAllWithVersions(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List keys) { + List>> futuresList = new ArrayList<>(keys.size()); + for (String key : keys) { + futuresList.add(service.submit(() -> { + Long version = attributeKvRepository.delete(entityId.getId(), attributeScope.getId(), keyDictionaryDao.getOrSaveKeyId(key)); + return TbPair.of(key, version); + })); + } + return futuresList; + } + @Transactional @Override public List> removeAllByEntityId(TenantId tenantId, EntityId entityId) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java index 0484337a08..0a3ffad09f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java @@ -40,11 +40,11 @@ public class SqlLatestInsertTsRepository extends AbstractVersionedInsertReposito private Boolean updateByLatestTs; private static final String BATCH_UPDATE = - "UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), version = version + 1 WHERE entity_id = ? AND key = ?"; + "UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), version = nextval('ts_kv_latest_version_seq') WHERE entity_id = ? AND key = ?"; private static final String INSERT_OR_UPDATE = - "INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json)) " + - "ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), version = ts_kv_latest.version + 1"; + "INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v, version) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), nextval('ts_kv_latest_version_seq')) " + + "ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), version = nextval('ts_kv_latest_version_seq')"; private static final String BATCH_UPDATE_BY_LATEST_TS = BATCH_UPDATE + " AND ts_kv_latest.ts <= ?"; diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index 9293636f0b..c2e3a2b108 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -102,6 +102,8 @@ CREATE TABLE IF NOT EXISTS audit_log ( action_failure_details varchar(1000000) ) PARTITION BY RANGE (created_time); +CREATE SEQUENCE IF NOT EXISTS attribute_kv_version_seq cache 1000; + CREATE TABLE IF NOT EXISTS attribute_kv ( entity_id uuid, attribute_type int, @@ -112,7 +114,7 @@ CREATE TABLE IF NOT EXISTS attribute_kv ( dbl_v double precision, json_v json, last_update_ts bigint, - version bigint default 0, + version bigint default 1, CONSTRAINT attribute_kv_pkey PRIMARY KEY (entity_id, attribute_type, attribute_key) ); @@ -539,6 +541,8 @@ CREATE TABLE IF NOT EXISTS entity_view ( CONSTRAINT entity_view_external_id_unq_key UNIQUE (tenant_id, external_id) ); +CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_version_seq cache 1000; + CREATE TABLE IF NOT EXISTS ts_kv_latest ( entity_id uuid NOT NULL, @@ -549,7 +553,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest long_v bigint, dbl_v double precision, json_v json, - version bigint default 0, + version bigint default 1, CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key) ); diff --git a/dao/src/main/resources/sql/schema-timescale.sql b/dao/src/main/resources/sql/schema-timescale.sql index 2d2381403d..7c2a99fba3 100644 --- a/dao/src/main/resources/sql/schema-timescale.sql +++ b/dao/src/main/resources/sql/schema-timescale.sql @@ -34,6 +34,8 @@ CREATE TABLE IF NOT EXISTS key_dictionary ( CONSTRAINT key_dictionary_id_pkey PRIMARY KEY (key) ); +CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_version_seq cache 1000; + CREATE TABLE IF NOT EXISTS ts_kv_latest ( entity_id uuid NOT NULL, key int NOT NULL, @@ -43,7 +45,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest ( long_v bigint, dbl_v double precision, json_v json, - version bigint default 0, + version bigint default 1, CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key) ); diff --git a/dao/src/main/resources/sql/schema-ts-latest-psql.sql b/dao/src/main/resources/sql/schema-ts-latest-psql.sql index 058104d4f2..d50571c091 100644 --- a/dao/src/main/resources/sql/schema-ts-latest-psql.sql +++ b/dao/src/main/resources/sql/schema-ts-latest-psql.sql @@ -14,6 +14,8 @@ -- limitations under the License. -- +CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_version_seq cache 1000; + CREATE TABLE IF NOT EXISTS ts_kv_latest ( entity_id uuid NOT NULL, @@ -24,6 +26,6 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest long_v bigint, dbl_v double precision, json_v json, - version bigint default 0, + version bigint default 1, CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key) -); \ No newline at end of file +); diff --git a/dao/src/test/resources/sql/psql/drop-all-tables.sql b/dao/src/test/resources/sql/psql/drop-all-tables.sql index 9c772df45b..e1c35d7a21 100644 --- a/dao/src/test/resources/sql/psql/drop-all-tables.sql +++ b/dao/src/test/resources/sql/psql/drop-all-tables.sql @@ -23,6 +23,7 @@ DROP TABLE IF EXISTS alarm_type; DROP TABLE IF EXISTS asset; DROP TABLE IF EXISTS audit_log; DROP TABLE IF EXISTS attribute_kv; +DROP SEQUENCE IF EXISTS attribute_kv_version_seq; DROP TABLE IF EXISTS component_descriptor; DROP TABLE IF EXISTS customer; DROP TABLE IF EXISTS device; @@ -36,6 +37,7 @@ DROP TABLE IF EXISTS relation; DROP TABLE IF EXISTS tenant; DROP TABLE IF EXISTS ts_kv; DROP TABLE IF EXISTS ts_kv_latest; +DROP SEQUENCE IF EXISTS ts_kv_latest_version_seq; DROP TABLE IF EXISTS ts_kv_dictionary; DROP TABLE IF EXISTS user_credentials; DROP TABLE IF EXISTS widgets_bundle_widget;