TS latest dao put to Redis WIP
This commit is contained in:
parent
9466578c91
commit
44096a7cb7
@ -214,8 +214,12 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
|
||||
|
||||
public void put(RedisConnection connection, K key, V value, RedisStringCommands.SetOption setOption) {
|
||||
byte[] rawKey = getRawKey(key);
|
||||
put(connection, rawKey, value, setOption);
|
||||
}
|
||||
|
||||
public void put(RedisConnection connection, byte[] rawKey, V value, RedisStringCommands.SetOption setOption) {
|
||||
byte[] rawValue = getRawValue(value);
|
||||
connection.stringCommands().set(rawKey, rawValue, cacheTtl, setOption);
|
||||
connection.stringCommands().set(rawKey, rawValue, this.cacheTtl, setOption);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -15,7 +15,10 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.sqlts;
|
||||
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -63,7 +66,24 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
|
||||
return sqlDao.saveLatest(tenantId, entityId, tsKvEntry);
|
||||
ListenableFuture<Void> future = sqlDao.saveLatest(tenantId, entityId, tsKvEntry);
|
||||
future = Futures.transform(future, x -> {
|
||||
cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), tsKvEntry);
|
||||
return x;
|
||||
},
|
||||
cacheExecutorService);
|
||||
Futures.addCallback(future, new FutureCallback<Void>() {
|
||||
@Override
|
||||
public void onSuccess(Void result) {
|
||||
log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.trace("saveLatest onFailure [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry, t);
|
||||
}
|
||||
}, MoreExecutors.directExecutor());
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -15,8 +15,10 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.timeseries;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.RedisStringCommands;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.cache.CacheSpecsMap;
|
||||
import org.thingsboard.server.cache.RedisTbTransactionalCache;
|
||||
@ -25,11 +27,24 @@ import org.thingsboard.server.cache.TbJavaRedisSerializer;
|
||||
import org.thingsboard.server.common.data.CacheConstants;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
|
||||
@Service("TsLatestCache")
|
||||
public class TsLatestRedisCache extends RedisTbTransactionalCache<TsLatestCacheKey, TsKvEntry> {
|
||||
@Slf4j
|
||||
public class TsLatestRedisCache<K extends Serializable, V extends Serializable> extends RedisTbTransactionalCache<TsLatestCacheKey, TsKvEntry> {
|
||||
|
||||
public TsLatestRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) {
|
||||
super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbJavaRedisSerializer<>()); }
|
||||
super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbJavaRedisSerializer<>());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(TsLatestCacheKey key, TsKvEntry value) {
|
||||
log.trace("put [{}][{}]", key, value);
|
||||
final byte[] rawKey = getRawKey(key);
|
||||
try (var connection = getConnection(rawKey)) {
|
||||
put(connection, rawKey, value, RedisStringCommands.SetOption.UPSERT);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -10,6 +10,8 @@
|
||||
<logger name="org.thingsboard.server.dao" level="WARN"/>
|
||||
<logger name="org.testcontainers" level="INFO" />
|
||||
<logger name="org.thingsboard.server.dao.sqlts" level="INFO" />
|
||||
<logger name="org.thingsboard.server.dao.timeseries.TsLatestRedisCache" level="TRACE" />
|
||||
|
||||
|
||||
<!-- Log Hibernate SQL queries -->
|
||||
<!-- <logger name="org.hibernate.SQL" level="DEBUG"/> -->
|
||||
Loading…
x
Reference in New Issue
Block a user