LUA script improvements and used versioned cache for latest ts
This commit is contained in:
parent
6c417f209f
commit
0b51baf38d
@ -80,7 +80,7 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
|
|||||||
try (var connection = connectionFactory.getConnection()) {
|
try (var connection = connectionFactory.getConnection()) {
|
||||||
byte[] rawKey = getRawKey(key);
|
byte[] rawKey = getRawKey(key);
|
||||||
byte[] rawValue = doGet(connection, rawKey);
|
byte[] rawValue = doGet(connection, rawKey);
|
||||||
if (rawValue == null) {
|
if (rawValue == null || rawValue.length == 0) {
|
||||||
return null;
|
return null;
|
||||||
} else if (Arrays.equals(rawValue, BINARY_NULL_VALUE)) {
|
} else if (Arrays.equals(rawValue, BINARY_NULL_VALUE)) {
|
||||||
return SimpleTbCacheValueWrapper.empty();
|
return SimpleTbCacheValueWrapper.empty();
|
||||||
|
|||||||
@ -44,17 +44,25 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
|
|||||||
local expiration = tonumber(ARGV[3])
|
local expiration = tonumber(ARGV[3])
|
||||||
|
|
||||||
local function setNewValue()
|
local function setNewValue()
|
||||||
local newValueWithVersion = struct.pack(">I8", newVersion) .. newValue:sub(9)
|
local newValueWithVersion = struct.pack(">I8", newVersion) .. newValue
|
||||||
redis.call('SET', key, newValueWithVersion, 'EX', expiration)
|
redis.call('SET', key, newValueWithVersion, 'EX', expiration)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
local function bytes_to_number(bytes)
|
||||||
|
local n = 0
|
||||||
|
for i = 1, 8 do
|
||||||
|
n = n * 256 + string.byte(bytes, i)
|
||||||
|
end
|
||||||
|
return n
|
||||||
|
end
|
||||||
|
|
||||||
-- Get the current version (first 8 bytes) of the current value
|
-- Get the current version (first 8 bytes) of the current value
|
||||||
local currentVersionBytes = redis.call('GETRANGE', key, 0, 7)
|
local currentVersionBytes = redis.call('GETRANGE', key, 0, 7)
|
||||||
|
|
||||||
if currentVersionBytes and #currentVersionBytes == 8 then
|
if currentVersionBytes and #currentVersionBytes == 8 then
|
||||||
local currentVersion = tonumber(struct.unpack(">I8", currentVersionBytes))
|
local currentVersion = bytes_to_number(currentVersionBytes)
|
||||||
|
|
||||||
if newVersion > currentVersion then
|
if newVersion > currentVersion or newVersion == 1 and currentVersion > 1 then
|
||||||
setNewValue()
|
setNewValue()
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
@ -62,7 +70,7 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
|
|||||||
setNewValue()
|
setNewValue()
|
||||||
end
|
end
|
||||||
""");
|
""");
|
||||||
static final byte[] SET_VERSIONED_VALUE_SHA = StringRedisSerializer.UTF_8.serialize("1d0cb3f1d1f899b8e456789fc5000196d5bb3025");
|
static final byte[] SET_VERSIONED_VALUE_SHA = StringRedisSerializer.UTF_8.serialize("05a09f34f523429c96c6eaabbe6f2595f5cba2c3");
|
||||||
|
|
||||||
public VersionedRedisTbCache(String cacheName, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory, TBRedisCacheConfiguration configuration, TbRedisSerializer<K, V> valueSerializer) {
|
public VersionedRedisTbCache(String cacheName, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory, TBRedisCacheConfiguration configuration, TbRedisSerializer<K, V> valueSerializer) {
|
||||||
super(cacheName, cacheSpecsMap, connectionFactory, configuration, valueSerializer);
|
super(cacheName, cacheSpecsMap, connectionFactory, configuration, valueSerializer);
|
||||||
|
|||||||
@ -13,7 +13,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.thingsboard.server.dao.timeseries;
|
package org.thingsboard.server.cache;
|
||||||
|
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
@ -22,11 +22,11 @@ import java.security.MessageDigest;
|
|||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
class TsLatestRedisCacheTest {
|
class VersionedRedisTbCacheTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testUpsertTsLatestLUAScriptHash() {
|
void testUpsertTsLatestLUAScriptHash() {
|
||||||
assertThat(getSHA1(TsLatestRedisCache.UPSERT_TS_LATEST_LUA_SCRIPT)).isEqualTo(new String(TsLatestRedisCache.UPSERT_TS_LATEST_SHA));
|
assertThat(getSHA1(VersionedRedisTbCache.SET_VERSIONED_VALUE_LUA_SCRIPT)).isEqualTo(new String(VersionedRedisTbCache.SET_VERSIONED_VALUE_SHA));
|
||||||
}
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
@ -16,5 +16,5 @@
|
|||||||
package org.thingsboard.server.common.data;
|
package org.thingsboard.server.common.data;
|
||||||
|
|
||||||
public interface HasVersion {
|
public interface HasVersion {
|
||||||
long getVersion();
|
Long getVersion();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -98,7 +98,7 @@ public class BaseAttributeKvEntry implements AttributeKvEntry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getVersion() {
|
public Long getVersion() {
|
||||||
return version;
|
return version;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -25,9 +25,18 @@ public class BasicTsKvEntry implements TsKvEntry {
|
|||||||
@Valid
|
@Valid
|
||||||
private final KvEntry kv;
|
private final KvEntry kv;
|
||||||
|
|
||||||
|
private final Long version;
|
||||||
|
|
||||||
public BasicTsKvEntry(long ts, KvEntry kv) {
|
public BasicTsKvEntry(long ts, KvEntry kv) {
|
||||||
this.ts = ts;
|
this.ts = ts;
|
||||||
this.kv = kv;
|
this.kv = kv;
|
||||||
|
this.version = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public BasicTsKvEntry(long ts, KvEntry kv, Long version) {
|
||||||
|
this.ts = ts;
|
||||||
|
this.kv = kv;
|
||||||
|
this.version = version;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -118,4 +127,8 @@ public class BasicTsKvEntry implements TsKvEntry {
|
|||||||
return Math.max(1, (length + MAX_CHARS_PER_DATA_POINT - 1) / MAX_CHARS_PER_DATA_POINT);
|
return Math.max(1, (length + MAX_CHARS_PER_DATA_POINT - 1) / MAX_CHARS_PER_DATA_POINT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long getVersion() {
|
||||||
|
return version;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
package org.thingsboard.server.common.data.kv;
|
package org.thingsboard.server.common.data.kv;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
|
import org.thingsboard.server.common.data.HasVersion;
|
||||||
import org.thingsboard.server.common.data.query.TsValue;
|
import org.thingsboard.server.common.data.query.TsValue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -24,7 +25,7 @@ import org.thingsboard.server.common.data.query.TsValue;
|
|||||||
* @author ashvayka
|
* @author ashvayka
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public interface TsKvEntry extends KvEntry {
|
public interface TsKvEntry extends KvEntry, HasVersion {
|
||||||
|
|
||||||
long getTs();
|
long getTs();
|
||||||
|
|
||||||
|
|||||||
@ -22,15 +22,22 @@ public class TsKvLatestRemovingResult {
|
|||||||
private String key;
|
private String key;
|
||||||
private TsKvEntry data;
|
private TsKvEntry data;
|
||||||
private boolean removed;
|
private boolean removed;
|
||||||
|
private Long version;
|
||||||
|
|
||||||
public TsKvLatestRemovingResult(TsKvEntry data) {
|
public TsKvLatestRemovingResult(String key, boolean removed) {
|
||||||
|
this(key, removed, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TsKvLatestRemovingResult(TsKvEntry data, Long version) {
|
||||||
this.key = data.getKey();
|
this.key = data.getKey();
|
||||||
this.data = data;
|
this.data = data;
|
||||||
this.removed = true;
|
this.removed = true;
|
||||||
|
this.version = version;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TsKvLatestRemovingResult(String key, boolean removed) {
|
public TsKvLatestRemovingResult(String key, boolean removed, Long version) {
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.removed = removed;
|
this.removed = removed;
|
||||||
|
this.version = version;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -119,10 +119,13 @@ public abstract class AbstractTsKvEntity implements ToData<TsKvEntry> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (aggValuesCount == null) {
|
if (aggValuesCount == null) {
|
||||||
return new BasicTsKvEntry(ts, kvEntry);
|
return new BasicTsKvEntry(ts, kvEntry, getVersion());
|
||||||
} else {
|
} else {
|
||||||
return new AggTsKvEntry(ts, kvEntry, aggValuesCount);
|
return new AggTsKvEntry(ts, kvEntry, aggValuesCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Long getVersion() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.dao.model.sqlts.latest;
|
package org.thingsboard.server.dao.model.sqlts.latest;
|
||||||
|
|
||||||
|
import jakarta.persistence.Column;
|
||||||
import jakarta.persistence.ColumnResult;
|
import jakarta.persistence.ColumnResult;
|
||||||
import jakarta.persistence.ConstructorResult;
|
import jakarta.persistence.ConstructorResult;
|
||||||
import jakarta.persistence.Entity;
|
import jakarta.persistence.Entity;
|
||||||
@ -30,6 +31,8 @@ import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository;
|
|||||||
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.thingsboard.server.dao.model.ModelConstants.VERSION_COLUMN;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@Entity
|
@Entity
|
||||||
@Table(name = "ts_kv_latest")
|
@Table(name = "ts_kv_latest")
|
||||||
@ -50,7 +53,7 @@ import java.util.UUID;
|
|||||||
@ColumnResult(name = "doubleValue", type = Double.class),
|
@ColumnResult(name = "doubleValue", type = Double.class),
|
||||||
@ColumnResult(name = "jsonValue", type = String.class),
|
@ColumnResult(name = "jsonValue", type = String.class),
|
||||||
@ColumnResult(name = "ts", type = Long.class),
|
@ColumnResult(name = "ts", type = Long.class),
|
||||||
|
@ColumnResult(name = "version", type = Long.class)
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
})
|
})
|
||||||
@ -65,6 +68,9 @@ import java.util.UUID;
|
|||||||
})
|
})
|
||||||
public final class TsKvLatestEntity extends AbstractTsKvEntity {
|
public final class TsKvLatestEntity extends AbstractTsKvEntity {
|
||||||
|
|
||||||
|
@Column(name = VERSION_COLUMN)
|
||||||
|
private Long version;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isNotEmpty() {
|
public boolean isNotEmpty() {
|
||||||
return strValue != null || longValue != null || doubleValue != null || booleanValue != null || jsonValue != null;
|
return strValue != null || longValue != null || doubleValue != null || booleanValue != null || jsonValue != null;
|
||||||
@ -73,7 +79,7 @@ public final class TsKvLatestEntity extends AbstractTsKvEntity {
|
|||||||
public TsKvLatestEntity() {
|
public TsKvLatestEntity() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public TsKvLatestEntity(UUID entityId, Integer key, String strKey, String strValue, Boolean boolValue, Long longValue, Double doubleValue, String jsonValue, Long ts) {
|
public TsKvLatestEntity(UUID entityId, Integer key, String strKey, String strValue, Boolean boolValue, Long longValue, Double doubleValue, String jsonValue, Long ts, Long version) {
|
||||||
this.entityId = entityId;
|
this.entityId = entityId;
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.ts = ts;
|
this.ts = ts;
|
||||||
@ -83,5 +89,6 @@ public final class TsKvLatestEntity extends AbstractTsKvEntity {
|
|||||||
this.booleanValue = boolValue;
|
this.booleanValue = boolValue;
|
||||||
this.jsonValue = jsonValue;
|
this.jsonValue = jsonValue;
|
||||||
this.strKey = strKey;
|
this.strKey = strKey;
|
||||||
|
this.version = version;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,6 +18,7 @@ package org.thingsboard.server.dao.sql;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
||||||
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
import javax.sql.DataSource;
|
import javax.sql.DataSource;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
@ -36,6 +37,9 @@ public abstract class JpaAbstractDaoListeningExecutorService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
protected JdbcTemplate jdbcTemplate;
|
protected JdbcTemplate jdbcTemplate;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
protected TransactionTemplate transactionTemplate;
|
||||||
|
|
||||||
protected void printWarnings(Statement statement) throws SQLException {
|
protected void printWarnings(Statement statement) throws SQLException {
|
||||||
SQLWarning warnings = statement.getWarnings();
|
SQLWarning warnings = statement.getWarnings();
|
||||||
if (warnings != null) {
|
if (warnings != null) {
|
||||||
|
|||||||
@ -206,15 +206,14 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
|
|||||||
return futuresList;
|
return futuresList;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transactional
|
|
||||||
@Override
|
@Override
|
||||||
public List<ListenableFuture<TbPair<String, Long>>> removeAllWithVersions(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List<String> keys) {
|
public List<ListenableFuture<TbPair<String, Long>>> removeAllWithVersions(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List<String> keys) {
|
||||||
List<ListenableFuture<TbPair<String, Long>>> futuresList = new ArrayList<>(keys.size());
|
List<ListenableFuture<TbPair<String, Long>>> futuresList = new ArrayList<>(keys.size());
|
||||||
for (String key : keys) {
|
for (String key : keys) {
|
||||||
futuresList.add(service.submit(() -> {
|
futuresList.add(service.submit(() -> {
|
||||||
Long version = jdbcTemplate.query("DELETE FROM attribute_kv WHERE entity_id = ? AND attribute_type = ? " +
|
Long version = transactionTemplate.execute(status -> jdbcTemplate.query("DELETE FROM attribute_kv WHERE entity_id = ? AND attribute_type = ? " +
|
||||||
"AND attribute_key = ? RETURNING nextval('attribute_kv_version_seq')",
|
"AND attribute_key = ? RETURNING nextval('attribute_kv_version_seq')",
|
||||||
rs -> rs.next() ? rs.getLong(1) : null, entityId.getId(), attributeScope.getId(), keyDictionaryDao.getOrSaveKeyId(key));
|
rs -> rs.next() ? rs.getLong(1) : null, entityId.getId(), attributeScope.getId(), keyDictionaryDao.getOrSaveKeyId(key)));
|
||||||
return TbPair.of(key, version);
|
return TbPair.of(key, version);
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -25,7 +25,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.springframework.context.annotation.Primary;
|
import org.springframework.context.annotation.Primary;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.thingsboard.server.cache.TbCacheValueWrapper;
|
import org.thingsboard.server.cache.TbCacheValueWrapper;
|
||||||
import org.thingsboard.server.cache.TbTransactionalCache;
|
import org.thingsboard.server.cache.VersionedTbCache;
|
||||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
@ -52,7 +52,7 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries
|
|||||||
final CacheExecutorService cacheExecutorService;
|
final CacheExecutorService cacheExecutorService;
|
||||||
final SqlTimeseriesLatestDao sqlDao;
|
final SqlTimeseriesLatestDao sqlDao;
|
||||||
final StatsFactory statsFactory;
|
final StatsFactory statsFactory;
|
||||||
final TbTransactionalCache<TsLatestCacheKey, TsKvEntry> cache;
|
final VersionedTbCache<TsLatestCacheKey, TsKvEntry> cache;
|
||||||
DefaultCounter hitCounter;
|
DefaultCounter hitCounter;
|
||||||
DefaultCounter missCounter;
|
DefaultCounter missCounter;
|
||||||
|
|
||||||
@ -64,17 +64,17 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
|
public ListenableFuture<Long> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
|
||||||
ListenableFuture<Void> future = sqlDao.saveLatest(tenantId, entityId, tsKvEntry);
|
ListenableFuture<Long> future = sqlDao.saveLatest(tenantId, entityId, tsKvEntry);
|
||||||
future = Futures.transform(future, x -> {
|
future = Futures.transform(future, x -> {
|
||||||
cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), tsKvEntry);
|
cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), tsKvEntry, x);
|
||||||
return x;
|
return x;
|
||||||
},
|
},
|
||||||
cacheExecutorService);
|
cacheExecutorService);
|
||||||
if (log.isTraceEnabled()) {
|
if (log.isTraceEnabled()) {
|
||||||
Futures.addCallback(future, new FutureCallback<>() {
|
Futures.addCallback(future, new FutureCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(Void result) {
|
public void onSuccess(Long result) {
|
||||||
log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry);
|
log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,7 +91,15 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries
|
|||||||
public ListenableFuture<TsKvLatestRemovingResult> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
public ListenableFuture<TsKvLatestRemovingResult> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
||||||
ListenableFuture<TsKvLatestRemovingResult> future = sqlDao.removeLatest(tenantId, entityId, query);
|
ListenableFuture<TsKvLatestRemovingResult> future = sqlDao.removeLatest(tenantId, entityId, query);
|
||||||
future = Futures.transform(future, x -> {
|
future = Futures.transform(future, x -> {
|
||||||
cache.evict(new TsLatestCacheKey(entityId, query.getKey()));
|
if (x.isRemoved()) {
|
||||||
|
TsLatestCacheKey key = new TsLatestCacheKey(entityId, query.getKey());
|
||||||
|
Long version = x.getVersion();
|
||||||
|
if (x.getData() != null) {
|
||||||
|
cache.put(key, x.getData(), version);
|
||||||
|
} else {
|
||||||
|
cache.evict(key, version);
|
||||||
|
}
|
||||||
|
}
|
||||||
return x;
|
return x;
|
||||||
},
|
},
|
||||||
cacheExecutorService);
|
cacheExecutorService);
|
||||||
@ -133,32 +141,11 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries
|
|||||||
return Futures.immediateFuture(Optional.ofNullable(tsKvEntry));
|
return Futures.immediateFuture(Optional.ofNullable(tsKvEntry));
|
||||||
}
|
}
|
||||||
log.debug("findLatest cache miss [{}][{}]", entityId, key);
|
log.debug("findLatest cache miss [{}][{}]", entityId, key);
|
||||||
ListenableFuture<Optional<TsKvEntry>> daoFuture = sqlDao.findLatestOpt(tenantId,entityId, key);
|
ListenableFuture<Optional<TsKvEntry>> daoFuture = sqlDao.findLatestOpt(tenantId, entityId, key);
|
||||||
|
|
||||||
return Futures.transformAsync(daoFuture, (daoValue) -> {
|
return Futures.transform(daoFuture, daoValue -> {
|
||||||
|
cache.put(cacheKey, daoValue.orElse(null));
|
||||||
if (daoValue.isEmpty()) {
|
return daoValue;
|
||||||
//TODO implement the cache logic if no latest found in TS DAO. Currently we are always getting from DB to stay on the safe side
|
|
||||||
return Futures.immediateFuture(daoValue);
|
|
||||||
}
|
|
||||||
ListenableFuture<Optional<TsKvEntry>> cachePutFuture = cacheExecutorService.submit(() -> {
|
|
||||||
cache.put(new TsLatestCacheKey(entityId, key), daoValue.get());
|
|
||||||
return daoValue;
|
|
||||||
});
|
|
||||||
|
|
||||||
Futures.addCallback(cachePutFuture, new FutureCallback<>() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(Optional<TsKvEntry> result) {
|
|
||||||
log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, key, result);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable t) {
|
|
||||||
log.info("saveLatest onFailure [{}][{}][{}]", entityId, key, daoValue, t);
|
|
||||||
}
|
|
||||||
|
|
||||||
}, MoreExecutors.directExecutor());
|
|
||||||
return cachePutFuture;
|
|
||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
|
|||||||
@ -190,7 +190,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
|
|||||||
return Futures.transformAsync(future, entryList -> {
|
return Futures.transformAsync(future, entryList -> {
|
||||||
if (entryList.size() == 1) {
|
if (entryList.size() == 1) {
|
||||||
TsKvEntry entry = entryList.get(0);
|
TsKvEntry entry = entryList.get(0);
|
||||||
return Futures.transform(getSaveLatestFuture(entityId, entry), v -> new TsKvLatestRemovingResult(entry), MoreExecutors.directExecutor());
|
return Futures.transform(getSaveLatestFuture(entityId, entry), v -> new TsKvLatestRemovingResult(entry, v), MoreExecutors.directExecutor());
|
||||||
} else {
|
} else {
|
||||||
log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
|
log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
|
||||||
}
|
}
|
||||||
@ -229,18 +229,18 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
|
|||||||
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false));
|
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false));
|
||||||
}
|
}
|
||||||
boolean isRemoved = false;
|
boolean isRemoved = false;
|
||||||
|
Long version = null;
|
||||||
long ts = latest.getTs();
|
long ts = latest.getTs();
|
||||||
if (ts >= query.getStartTs() && ts < query.getEndTs()) {
|
if (ts >= query.getStartTs() && ts < query.getEndTs()) {
|
||||||
TsKvLatestEntity latestEntity = new TsKvLatestEntity();
|
version = transactionTemplate.execute(status -> jdbcTemplate.query("DELETE FROM ts_kv_latest WHERE entity_id = ? " +
|
||||||
latestEntity.setEntityId(entityId.getId());
|
"AND key = ? RETURNING nextval('ts_kv_latest_version_seq')",
|
||||||
latestEntity.setKey(keyDictionaryDao.getOrSaveKeyId(query.getKey()));
|
rs -> rs.next() ? rs.getLong(1) : null, entityId.getId(), keyDictionaryDao.getOrSaveKeyId(query.getKey())));
|
||||||
tsKvLatestRepository.delete(latestEntity);
|
|
||||||
isRemoved = true;
|
isRemoved = true;
|
||||||
if (query.getRewriteLatestIfDeleted()) {
|
if (query.getRewriteLatestIfDeleted()) {
|
||||||
return getNewLatestEntryFuture(tenantId, entityId, query);
|
return getNewLatestEntryFuture(tenantId, entityId, query);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved));
|
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved, version));
|
||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -31,7 +31,7 @@ public class SearchTsKvLatestRepository {
|
|||||||
public static final String FIND_ALL_BY_ENTITY_ID = "findAllByEntityId";
|
public static final String FIND_ALL_BY_ENTITY_ID = "findAllByEntityId";
|
||||||
|
|
||||||
public static final String FIND_ALL_BY_ENTITY_ID_QUERY = "SELECT ts_kv_latest.entity_id AS entityId, ts_kv_latest.key AS key, key_dictionary.key AS strKey, ts_kv_latest.str_v AS strValue," +
|
public static final String FIND_ALL_BY_ENTITY_ID_QUERY = "SELECT ts_kv_latest.entity_id AS entityId, ts_kv_latest.key AS key, key_dictionary.key AS strKey, ts_kv_latest.str_v AS strValue," +
|
||||||
" ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue, ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.json_v AS jsonValue, ts_kv_latest.ts AS ts FROM ts_kv_latest " +
|
" ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue, ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.json_v AS jsonValue, ts_kv_latest.ts AS ts, ts_kv_latest.version AS version FROM ts_kv_latest " +
|
||||||
"INNER JOIN key_dictionary ON ts_kv_latest.key = key_dictionary.key_id WHERE ts_kv_latest.entity_id = cast(:id AS uuid)";
|
"INNER JOIN key_dictionary ON ts_kv_latest.key = key_dictionary.key_id WHERE ts_kv_latest.entity_id = cast(:id AS uuid)";
|
||||||
|
|
||||||
@PersistenceContext
|
@PersistenceContext
|
||||||
|
|||||||
@ -161,7 +161,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
|
|||||||
var entryList = result.getData();
|
var entryList = result.getData();
|
||||||
if (entryList.size() == 1) {
|
if (entryList.size() == 1) {
|
||||||
TsKvEntry entry = entryList.get(0);
|
TsKvEntry entry = entryList.get(0);
|
||||||
return Futures.transform(saveLatest(tenantId, entityId, entryList.get(0)), v -> new TsKvLatestRemovingResult(entry), MoreExecutors.directExecutor());
|
return Futures.transform(saveLatest(tenantId, entityId, entryList.get(0)), v -> new TsKvLatestRemovingResult(entry, v), MoreExecutors.directExecutor());
|
||||||
} else {
|
} else {
|
||||||
log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
|
log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,134 +15,41 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.dao.timeseries;
|
package org.thingsboard.server.dao.timeseries;
|
||||||
|
|
||||||
import jakarta.annotation.PostConstruct;
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.NotImplementedException;
|
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
import org.springframework.dao.InvalidDataAccessApiUsageException;
|
|
||||||
import org.springframework.data.redis.connection.RedisConnection;
|
|
||||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||||
import org.springframework.data.redis.connection.ReturnType;
|
import org.springframework.data.redis.serializer.SerializationException;
|
||||||
import org.springframework.data.redis.serializer.StringRedisSerializer;
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.thingsboard.server.cache.CacheSpecsMap;
|
import org.thingsboard.server.cache.CacheSpecsMap;
|
||||||
import org.thingsboard.server.cache.RedisTbTransactionalCache;
|
|
||||||
import org.thingsboard.server.cache.TBRedisCacheConfiguration;
|
import org.thingsboard.server.cache.TBRedisCacheConfiguration;
|
||||||
import org.thingsboard.server.cache.TbCacheTransaction;
|
import org.thingsboard.server.cache.TbRedisSerializer;
|
||||||
import org.thingsboard.server.cache.TbCacheValueWrapper;
|
import org.thingsboard.server.cache.VersionedRedisTbCache;
|
||||||
import org.thingsboard.server.cache.TbJavaRedisSerializer;
|
|
||||||
import org.thingsboard.server.common.data.CacheConstants;
|
import org.thingsboard.server.common.data.CacheConstants;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
|
import org.thingsboard.server.common.util.KvProtoUtil;
|
||||||
import java.io.Serializable;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
|
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
|
||||||
@Service("TsLatestCache")
|
@Service("TsLatestCache")
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class TsLatestRedisCache<K extends Serializable, V extends Serializable> extends RedisTbTransactionalCache<TsLatestCacheKey, TsKvEntry> {
|
public class TsLatestRedisCache extends VersionedRedisTbCache<TsLatestCacheKey, TsKvEntry> {
|
||||||
|
|
||||||
static final byte[] UPSERT_TS_LATEST_LUA_SCRIPT = StringRedisSerializer.UTF_8.serialize("" +
|
|
||||||
"redis.call('ZREMRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[1]); " +
|
|
||||||
"redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2]); " +
|
|
||||||
"local current_size = redis.call('ZCARD', KEYS[1]); " +
|
|
||||||
"if current_size > 1 then" +
|
|
||||||
" redis.call('ZREMRANGEBYRANK', KEYS[1], 0, -2) " +
|
|
||||||
"end;");
|
|
||||||
static final byte[] UPSERT_TS_LATEST_SHA = StringRedisSerializer.UTF_8.serialize("24e226c3ea34e3e850113e8eb1f3cd2b88171988");
|
|
||||||
|
|
||||||
public TsLatestRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) {
|
public TsLatestRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) {
|
||||||
super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbJavaRedisSerializer<>());
|
super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() {
|
||||||
}
|
@Override
|
||||||
|
public byte[] serialize(TsKvEntry tsKvEntry) throws SerializationException {
|
||||||
@PostConstruct
|
return KvProtoUtil.toTsKvProto(tsKvEntry.getTs(), tsKvEntry).toByteArray();
|
||||||
public void init() {
|
|
||||||
try (var connection = getConnection(UPSERT_TS_LATEST_SHA)) {
|
|
||||||
log.debug("Loading LUA with expected SHA[{}], connection [{}]", new String(UPSERT_TS_LATEST_SHA), connection.getNativeConnection());
|
|
||||||
String sha = connection.scriptingCommands().scriptLoad(UPSERT_TS_LATEST_LUA_SCRIPT);
|
|
||||||
if (!Arrays.equals(UPSERT_TS_LATEST_SHA, StringRedisSerializer.UTF_8.serialize(sha))) {
|
|
||||||
log.error("SHA for UPSERT_TS_LATEST_LUA_SCRIPT wrong! Expected [{}], but actual [{}], connection [{}]", new String(UPSERT_TS_LATEST_SHA), sha, connection.getNativeConnection());
|
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
|
||||||
log.error("Error on Redis TS Latest cache init", t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbCacheValueWrapper<TsKvEntry> get(TsLatestCacheKey key) {
|
public TsKvEntry deserialize(TsLatestCacheKey key, byte[] bytes) throws SerializationException {
|
||||||
log.debug("get [{}]", key);
|
|
||||||
return super.get(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected byte[] doGet(RedisConnection connection, byte[] rawKey) {
|
|
||||||
log.trace("doGet [{}][{}]", connection, rawKey);
|
|
||||||
Set<byte[]> values = connection.commands().zRange(rawKey, -1, -1);
|
|
||||||
return values == null ? null : values.stream().findFirst().orElse(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void put(TsLatestCacheKey key, TsKvEntry value) {
|
|
||||||
log.trace("put [{}][{}]", key, value);
|
|
||||||
final byte[] rawKey = getRawKey(key);
|
|
||||||
try (var connection = getConnection(rawKey)) {
|
|
||||||
byte[] rawValue = getRawValue(value);
|
|
||||||
byte[] ts = StringRedisSerializer.UTF_8.serialize(String.valueOf(value.toTsValue().getTs()));
|
|
||||||
try {
|
|
||||||
connection.scriptingCommands().evalSha(UPSERT_TS_LATEST_SHA, ReturnType.VALUE, 1, rawKey, ts, rawValue);
|
|
||||||
} catch (InvalidDataAccessApiUsageException e) {
|
|
||||||
log.debug("loading LUA [{}]", connection.getNativeConnection());
|
|
||||||
String sha = connection.scriptingCommands().scriptLoad(UPSERT_TS_LATEST_LUA_SCRIPT);
|
|
||||||
if (!Arrays.equals(UPSERT_TS_LATEST_SHA, StringRedisSerializer.UTF_8.serialize(sha))) {
|
|
||||||
log.error("SHA for UPSERT_TS_LATEST_LUA_SCRIPT wrong! Expected [{}], but actual [{}]", new String(UPSERT_TS_LATEST_SHA), sha);
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
connection.scriptingCommands().evalSha(UPSERT_TS_LATEST_SHA, ReturnType.VALUE, 1, rawKey, ts, rawValue);
|
return KvProtoUtil.fromTsKvProto(TransportProtos.TsKvProto.parseFrom(bytes));
|
||||||
} catch (InvalidDataAccessApiUsageException ignored) {
|
} catch (InvalidProtocolBufferException e) {
|
||||||
log.debug("Slowly executing eval instead of fast evalsha");
|
throw new SerializationException(e.getMessage());
|
||||||
connection.scriptingCommands().eval(UPSERT_TS_LATEST_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, ts, rawValue);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void evict(TsLatestCacheKey key) {
|
|
||||||
log.trace("evict [{}]", key);
|
|
||||||
final byte[] rawKey = getRawKey(key);
|
|
||||||
try (var connection = getConnection(rawKey)) {
|
|
||||||
connection.keyCommands().del(rawKey);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void putIfAbsent(TsLatestCacheKey key, TsKvEntry value) {
|
|
||||||
log.trace("putIfAbsent [{}][{}]", key, value);
|
|
||||||
throw new NotImplementedException("putIfAbsent is not supported by TsLatestRedisCache");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void evict(Collection<TsLatestCacheKey> keys) {
|
|
||||||
throw new NotImplementedException("evict by many keys is not supported by TsLatestRedisCache");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void evictOrPut(TsLatestCacheKey key, TsKvEntry value) {
|
|
||||||
throw new NotImplementedException("evictOrPut is not supported by TsLatestRedisCache");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public TbCacheTransaction<TsLatestCacheKey, TsKvEntry> newTransactionForKey(TsLatestCacheKey key) {
|
|
||||||
throw new NotImplementedException("newTransactionForKey is not supported by TsLatestRedisCache");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public TbCacheTransaction<TsLatestCacheKey, TsKvEntry> newTransactionForKeys(List<TsLatestCacheKey> keys) {
|
|
||||||
throw new NotImplementedException("newTransactionForKeys is not supported by TsLatestRedisCache");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -59,9 +59,6 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
|
|||||||
private static final String OLD_VALUE = "OLD VALUE";
|
private static final String OLD_VALUE = "OLD VALUE";
|
||||||
private static final String NEW_VALUE = "NEW VALUE";
|
private static final String NEW_VALUE = "NEW VALUE";
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private VersionedTbCache<AttributeCacheKey, AttributeKvEntry> cache;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private AttributesService attributesService;
|
private AttributesService attributesService;
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user