added CachedRedisSqlTimeseriesLatestDao and TsLatestRedisCache (prototype impl with string commands). Caffeine cache not implemented yet
This commit is contained in:
parent
9b41a4e26c
commit
551325b8c7
@ -24,6 +24,7 @@ import lombok.RequiredArgsConstructor;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.context.annotation.Primary;
|
import org.springframework.context.annotation.Primary;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.thingsboard.server.cache.TbCacheValueWrapper;
|
||||||
import org.thingsboard.server.cache.TbTransactionalCache;
|
import org.thingsboard.server.cache.TbTransactionalCache;
|
||||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
@ -48,14 +49,12 @@ import java.util.Optional;
|
|||||||
@Primary
|
@Primary
|
||||||
public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao implements TimeseriesLatestDao {
|
public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao implements TimeseriesLatestDao {
|
||||||
public static final String STATS_NAME = "ts_latest.cache";
|
public static final String STATS_NAME = "ts_latest.cache";
|
||||||
|
|
||||||
DefaultCounter hitCounter;
|
|
||||||
DefaultCounter missCounter;
|
|
||||||
|
|
||||||
final CacheExecutorService cacheExecutorService;
|
final CacheExecutorService cacheExecutorService;
|
||||||
final SqlTimeseriesLatestDao sqlDao;
|
final SqlTimeseriesLatestDao sqlDao;
|
||||||
final StatsFactory statsFactory;
|
final StatsFactory statsFactory;
|
||||||
final TbTransactionalCache<TsLatestCacheKey, TsKvEntry> cache;
|
final TbTransactionalCache<TsLatestCacheKey, TsKvEntry> cache;
|
||||||
|
DefaultCounter hitCounter;
|
||||||
|
DefaultCounter missCounter;
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
@ -72,37 +71,101 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries
|
|||||||
return x;
|
return x;
|
||||||
},
|
},
|
||||||
cacheExecutorService);
|
cacheExecutorService);
|
||||||
Futures.addCallback(future, new FutureCallback<Void>() {
|
if (log.isTraceEnabled()) {
|
||||||
@Override
|
Futures.addCallback(future, new FutureCallback<>() {
|
||||||
public void onSuccess(Void result) {
|
@Override
|
||||||
log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry);
|
public void onSuccess(Void result) {
|
||||||
}
|
log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
log.trace("saveLatest onFailure [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry, t);
|
log.info("saveLatest onFailure [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry, t);
|
||||||
}
|
}
|
||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
|
}
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<TsKvLatestRemovingResult> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
public ListenableFuture<TsKvLatestRemovingResult> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
||||||
return sqlDao.removeLatest(tenantId, entityId, query);
|
ListenableFuture<TsKvLatestRemovingResult> future = sqlDao.removeLatest(tenantId, entityId, query);
|
||||||
|
future = Futures.transform(future, x -> {
|
||||||
|
cache.evict(new TsLatestCacheKey(entityId, query.getKey()));
|
||||||
|
return x;
|
||||||
|
},
|
||||||
|
cacheExecutorService);
|
||||||
|
if (log.isTraceEnabled()) {
|
||||||
|
Futures.addCallback(future, new FutureCallback<>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(TsKvLatestRemovingResult result) {
|
||||||
|
log.trace("removeLatest onSuccess [{}][{}][{}]", entityId, query.getKey(), query);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
log.info("removeLatest onFailure [{}][{}][{}]", entityId, query.getKey(), query, t);
|
||||||
|
}
|
||||||
|
}, MoreExecutors.directExecutor());
|
||||||
|
}
|
||||||
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Optional<TsKvEntry>> findLatestOpt(TenantId tenantId, EntityId entityId, String key) {
|
public ListenableFuture<Optional<TsKvEntry>> findLatestOpt(TenantId tenantId, EntityId entityId, String key) {
|
||||||
return sqlDao.findLatestOpt(tenantId, entityId, key);
|
log.trace("findLatestOpt");
|
||||||
|
return doFindLatest(tenantId, entityId, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) {
|
public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) {
|
||||||
return sqlDao.findLatest(tenantId, entityId, key);
|
return Futures.transform(doFindLatest(tenantId, entityId, key), x -> sqlDao.wrapNullTsKvEntry(key, x.orElse(null)), MoreExecutors.directExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
|
public ListenableFuture<Optional<TsKvEntry>> doFindLatest(TenantId tenantId, EntityId entityId, String key) {
|
||||||
|
final TsLatestCacheKey cacheKey = new TsLatestCacheKey(entityId, key);
|
||||||
|
ListenableFuture<TbCacheValueWrapper<TsKvEntry>> cacheFuture = cacheExecutorService.submit(() -> cache.get(cacheKey));
|
||||||
|
|
||||||
|
return Futures.transformAsync(cacheFuture, (cacheValueWrap) -> {
|
||||||
|
if (cacheValueWrap != null) {
|
||||||
|
final TsKvEntry tsKvEntry = cacheValueWrap.get();
|
||||||
|
log.debug("findLatest cache hit [{}][{}][{}]", entityId, key, tsKvEntry);
|
||||||
|
return Futures.immediateFuture(Optional.ofNullable(tsKvEntry));
|
||||||
|
}
|
||||||
|
log.debug("findLatest cache miss [{}][{}]", entityId, key);
|
||||||
|
ListenableFuture<Optional<TsKvEntry>> daoFuture = sqlDao.findLatestOpt(tenantId,entityId, key);
|
||||||
|
|
||||||
|
return Futures.transformAsync(daoFuture, (daoValue) -> {
|
||||||
|
|
||||||
|
if (daoValue.isEmpty()) {
|
||||||
|
//TODO implement the cache logic if no latest found in TS DAO. Currently we are always getting from DB to stay on the safe side
|
||||||
|
return Futures.immediateFuture(daoValue);
|
||||||
|
}
|
||||||
|
ListenableFuture<Optional<TsKvEntry>> cachePutFuture = cacheExecutorService.submit(() -> {
|
||||||
|
cache.put(new TsLatestCacheKey(entityId, key), daoValue.get());
|
||||||
|
return daoValue;
|
||||||
|
});
|
||||||
|
|
||||||
|
Futures.addCallback(cachePutFuture, new FutureCallback<>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Optional<TsKvEntry> result) {
|
||||||
|
log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, key, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
log.info("saveLatest onFailure [{}][{}][{}]", entityId, key, daoValue, t);
|
||||||
|
}
|
||||||
|
|
||||||
|
}, MoreExecutors.directExecutor());
|
||||||
|
return cachePutFuture;
|
||||||
|
}, MoreExecutors.directExecutor());
|
||||||
|
}, MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TsKvEntry findLatestSync(TenantId tenantId, EntityId entityId, String key) {
|
public TsKvEntry findLatestSync(TenantId tenantId, EntityId entityId, String key) {
|
||||||
|
log.trace("findLatestSync DEPRECATED [{}][{}]", entityId, key);
|
||||||
return sqlDao.findLatestSync(tenantId, entityId, key);
|
return sqlDao.findLatestSync(tenantId, entityId, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -16,18 +16,24 @@
|
|||||||
package org.thingsboard.server.dao.timeseries;
|
package org.thingsboard.server.dao.timeseries;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang3.NotImplementedException;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.data.redis.connection.RedisConnection;
|
||||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||||
import org.springframework.data.redis.connection.RedisStringCommands;
|
import org.springframework.data.redis.connection.RedisStringCommands;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.thingsboard.server.cache.CacheSpecsMap;
|
import org.thingsboard.server.cache.CacheSpecsMap;
|
||||||
import org.thingsboard.server.cache.RedisTbTransactionalCache;
|
import org.thingsboard.server.cache.RedisTbTransactionalCache;
|
||||||
import org.thingsboard.server.cache.TBRedisCacheConfiguration;
|
import org.thingsboard.server.cache.TBRedisCacheConfiguration;
|
||||||
|
import org.thingsboard.server.cache.TbCacheTransaction;
|
||||||
|
import org.thingsboard.server.cache.TbCacheValueWrapper;
|
||||||
import org.thingsboard.server.cache.TbJavaRedisSerializer;
|
import org.thingsboard.server.cache.TbJavaRedisSerializer;
|
||||||
import org.thingsboard.server.common.data.CacheConstants;
|
import org.thingsboard.server.common.data.CacheConstants;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
|
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
|
||||||
@Service("TsLatestCache")
|
@Service("TsLatestCache")
|
||||||
@ -47,4 +53,51 @@ public class TsLatestRedisCache<K extends Serializable, V extends Serializable>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void putIfAbsent(TsLatestCacheKey key, TsKvEntry value) {
|
||||||
|
log.trace("putIfAbsent [{}][{}]", key, value);
|
||||||
|
throw new NotImplementedException("putIfAbsent is not supported by TsLatestRedisCache");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TbCacheValueWrapper<TsKvEntry> get(TsLatestCacheKey key) {
|
||||||
|
log.debug("get [{}]", key);
|
||||||
|
return super.get(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected byte[] doGet(RedisConnection connection, byte[] rawKey) {
|
||||||
|
log.trace("doGet [{}][{}]", connection, rawKey);
|
||||||
|
return connection.stringCommands().get(rawKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void evict(TsLatestCacheKey key){
|
||||||
|
log.trace("evict [{}]", key);
|
||||||
|
final byte[] rawKey = getRawKey(key);
|
||||||
|
try (var connection = getConnection(rawKey)) {
|
||||||
|
connection.keyCommands().del(rawKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void evict(Collection<TsLatestCacheKey> keys) {
|
||||||
|
throw new NotImplementedException("evict by many keys is not supported by TsLatestRedisCache");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void evictOrPut(TsLatestCacheKey key, TsKvEntry value) {
|
||||||
|
throw new NotImplementedException("evictOrPut is not supported by TsLatestRedisCache");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TbCacheTransaction<TsLatestCacheKey, TsKvEntry> newTransactionForKey(TsLatestCacheKey key) {
|
||||||
|
throw new NotImplementedException("newTransactionForKey is not supported by TsLatestRedisCache");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TbCacheTransaction<TsLatestCacheKey, TsKvEntry> newTransactionForKeys(List<TsLatestCacheKey> keys) {
|
||||||
|
throw new NotImplementedException("newTransactionForKeys is not supported by TsLatestRedisCache");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user