diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java index b283832186..482b456e60 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java @@ -24,6 +24,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; +import org.thingsboard.server.cache.TbCacheValueWrapper; import org.thingsboard.server.cache.TbTransactionalCache; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; @@ -48,14 +49,12 @@ import java.util.Optional; @Primary public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao implements TimeseriesLatestDao { public static final String STATS_NAME = "ts_latest.cache"; - - DefaultCounter hitCounter; - DefaultCounter missCounter; - final CacheExecutorService cacheExecutorService; final SqlTimeseriesLatestDao sqlDao; final StatsFactory statsFactory; final TbTransactionalCache cache; + DefaultCounter hitCounter; + DefaultCounter missCounter; @PostConstruct public void init() { @@ -72,37 +71,101 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries return x; }, cacheExecutorService); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Void result) { - log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry); - } + if (log.isTraceEnabled()) { + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(Void result) { + log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry); + } - @Override - public void onFailure(Throwable t) { - log.trace("saveLatest onFailure [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry, t); - } - }, MoreExecutors.directExecutor()); + @Override + public void onFailure(Throwable t) { + log.info("saveLatest onFailure [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry, t); + } + }, MoreExecutors.directExecutor()); + } return future; } @Override public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return sqlDao.removeLatest(tenantId, entityId, query); + ListenableFuture future = sqlDao.removeLatest(tenantId, entityId, query); + future = Futures.transform(future, x -> { + cache.evict(new TsLatestCacheKey(entityId, query.getKey())); + return x; + }, + cacheExecutorService); + if (log.isTraceEnabled()) { + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(TsKvLatestRemovingResult result) { + log.trace("removeLatest onSuccess [{}][{}][{}]", entityId, query.getKey(), query); + } + + @Override + public void onFailure(Throwable t) { + log.info("removeLatest onFailure [{}][{}][{}]", entityId, query.getKey(), query, t); + } + }, MoreExecutors.directExecutor()); + } + return future; } @Override public ListenableFuture> findLatestOpt(TenantId tenantId, EntityId entityId, String key) { - return sqlDao.findLatestOpt(tenantId, entityId, key); + log.trace("findLatestOpt"); + return doFindLatest(tenantId, entityId, key); } @Override public ListenableFuture findLatest(TenantId tenantId, EntityId entityId, String key) { - return sqlDao.findLatest(tenantId, entityId, key); + return Futures.transform(doFindLatest(tenantId, entityId, key), x -> sqlDao.wrapNullTsKvEntry(key, x.orElse(null)), MoreExecutors.directExecutor()); + } + + public ListenableFuture> doFindLatest(TenantId tenantId, EntityId entityId, String key) { + final TsLatestCacheKey cacheKey = new TsLatestCacheKey(entityId, key); + ListenableFuture> cacheFuture = cacheExecutorService.submit(() -> cache.get(cacheKey)); + + return Futures.transformAsync(cacheFuture, (cacheValueWrap) -> { + if (cacheValueWrap != null) { + final TsKvEntry tsKvEntry = cacheValueWrap.get(); + log.debug("findLatest cache hit [{}][{}][{}]", entityId, key, tsKvEntry); + return Futures.immediateFuture(Optional.ofNullable(tsKvEntry)); + } + log.debug("findLatest cache miss [{}][{}]", entityId, key); + ListenableFuture> daoFuture = sqlDao.findLatestOpt(tenantId,entityId, key); + + return Futures.transformAsync(daoFuture, (daoValue) -> { + + if (daoValue.isEmpty()) { + //TODO implement the cache logic if no latest found in TS DAO. Currently we are always getting from DB to stay on the safe side + return Futures.immediateFuture(daoValue); + } + ListenableFuture> cachePutFuture = cacheExecutorService.submit(() -> { + cache.put(new TsLatestCacheKey(entityId, key), daoValue.get()); + return daoValue; + }); + + Futures.addCallback(cachePutFuture, new FutureCallback<>() { + @Override + public void onSuccess(Optional result) { + log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, key, result); + } + + @Override + public void onFailure(Throwable t) { + log.info("saveLatest onFailure [{}][{}][{}]", entityId, key, daoValue, t); + } + + }, MoreExecutors.directExecutor()); + return cachePutFuture; + }, MoreExecutors.directExecutor()); + }, MoreExecutors.directExecutor()); } @Override public TsKvEntry findLatestSync(TenantId tenantId, EntityId entityId, String key) { + log.trace("findLatestSync DEPRECATED [{}][{}]", entityId, key); return sqlDao.findLatestSync(tenantId, entityId, key); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java index c0bca607c9..61d2c8bdb2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java @@ -16,18 +16,24 @@ package org.thingsboard.server.dao.timeseries; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.NotImplementedException; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.stereotype.Service; import org.thingsboard.server.cache.CacheSpecsMap; import org.thingsboard.server.cache.RedisTbTransactionalCache; import org.thingsboard.server.cache.TBRedisCacheConfiguration; +import org.thingsboard.server.cache.TbCacheTransaction; +import org.thingsboard.server.cache.TbCacheValueWrapper; import org.thingsboard.server.cache.TbJavaRedisSerializer; import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.io.Serializable; +import java.util.Collection; +import java.util.List; @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") @Service("TsLatestCache") @@ -47,4 +53,51 @@ public class TsLatestRedisCache } } + @Override + public void putIfAbsent(TsLatestCacheKey key, TsKvEntry value) { + log.trace("putIfAbsent [{}][{}]", key, value); + throw new NotImplementedException("putIfAbsent is not supported by TsLatestRedisCache"); + } + + @Override + public TbCacheValueWrapper get(TsLatestCacheKey key) { + log.debug("get [{}]", key); + return super.get(key); + } + + @Override + protected byte[] doGet(RedisConnection connection, byte[] rawKey) { + log.trace("doGet [{}][{}]", connection, rawKey); + return connection.stringCommands().get(rawKey); + } + + @Override + public void evict(TsLatestCacheKey key){ + log.trace("evict [{}]", key); + final byte[] rawKey = getRawKey(key); + try (var connection = getConnection(rawKey)) { + connection.keyCommands().del(rawKey); + } + } + + @Override + public void evict(Collection keys) { + throw new NotImplementedException("evict by many keys is not supported by TsLatestRedisCache"); + } + + @Override + public void evictOrPut(TsLatestCacheKey key, TsKvEntry value) { + throw new NotImplementedException("evictOrPut is not supported by TsLatestRedisCache"); + } + + @Override + public TbCacheTransaction newTransactionForKey(TsLatestCacheKey key) { + throw new NotImplementedException("newTransactionForKey is not supported by TsLatestRedisCache"); + } + + @Override + public TbCacheTransaction newTransactionForKeys(List keys) { + throw new NotImplementedException("newTransactionForKeys is not supported by TsLatestRedisCache"); + } + }