Merge remote-tracking branch 'smatvienko-tb/feature/latest-ts-redis-cache-aside-dao' into feature/attr_tskv_version

This commit is contained in:
YevhenBondarenko 2024-07-02 12:56:33 +02:00
commit 6c417f209f
19 changed files with 837 additions and 40 deletions

View File

@ -491,6 +491,10 @@ cache:
attributes:
# make sure that if cache.type is 'redis' and cache.attributes.enabled is 'true' if you change 'maxmemory-policy' Redis config property to 'allkeys-lru', 'allkeys-lfu' or 'allkeys-random'
enabled: "${CACHE_ATTRIBUTES_ENABLED:true}"
ts_latest:
# Will enable cache-aside strategy for SQL timeseries latest DAO.
# make sure that if cache.type is 'redis' and cache.ts_latest.enabled is 'true' if you change 'maxmemory-policy' Redis config property to 'allkeys-lru', 'allkeys-lfu' or 'allkeys-random'
enabled: "${CACHE_TS_LATEST_ENABLED:true}"
specs:
relations:
timeToLiveInMinutes: "${CACHE_SPECS_RELATIONS_TTL:1440}" # Relations cache TTL
@ -547,6 +551,9 @@ cache:
attributes:
timeToLiveInMinutes: "${CACHE_SPECS_ATTRIBUTES_TTL:1440}" # Attributes cache TTL
maxSize: "${CACHE_SPECS_ATTRIBUTES_MAX_SIZE:100000}" # 0 means the cache is disabled
tsLatest:
timeToLiveInMinutes: "${CACHE_SPECS_TS_LATEST_TTL:1440}" # Timeseries latest cache TTL
maxSize: "${CACHE_SPECS_TS_LATEST_MAX_SIZE:100000}" # 0 means the cache is disabled
userSessionsInvalidation:
# The value of this TTL is ignored and replaced by the JWT refresh token expiration time
timeToLiveInMinutes: "0"

57
build.sh Executable file
View File

@ -0,0 +1,57 @@
#!/bin/bash
#
# Copyright © 2016-2024 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
set -e # exit on any error
#PROJECTS="msa/tb-node,msa/web-ui,rule-engine-pe/rule-node-twilio-sms"
PROJECTS=""
if [ "$1" ]; then
PROJECTS="--projects $1"
fi
echo "Building and pushing [amd64,arm64] projects '$PROJECTS' ..."
echo "HELP: usage ./build.sh [projects]"
echo "HELP: example ./build.sh msa/web-ui,msa/web-report"
java -version
#echo "Cleaning ui-ngx/node_modules" && rm -rf ui-ngx/node_modules
MAVEN_OPTS="-Xmx1024m" NODE_OPTIONS="--max_old_space_size=4096" DOCKER_CLI_EXPERIMENTAL=enabled DOCKER_BUILDKIT=0 \
mvn -T2 license:format clean install -DskipTests \
$PROJECTS --also-make
# \
# -Dpush-docker-amd-arm-images
# -Ddockerfile.skip=false -Dpush-docker-image=true
# --offline
# --projects '!msa/web-report' --also-make
# push all
# mvn -T 1C license:format clean install -DskipTests -Ddockerfile.skip=false -Dpush-docker-image=true
## Build and push AMD and ARM docker images using docker buildx
## Reference to article how to setup docker miltiplatform build environment: https://medium.com/@artur.klauser/building-multi-architecture-docker-images-with-buildx-27d80f7e2408
## install docker-ce from docker repo https://docs.docker.com/engine/install/ubuntu/
# sudo apt install -y qemu-user-static binfmt-support
# export DOCKER_CLI_EXPERIMENTAL=enabled
# docker version
# docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
# docker buildx create --name mybuilder
# docker buildx use mybuilder
# docker buildx inspect --bootstrap
# docker buildx ls
# mvn clean install -P push-docker-amd-arm-images

View File

@ -51,6 +51,7 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
@Getter
private final String cacheName;
@Getter
private final JedisConnectionFactory connectionFactory;
private final RedisSerializer<String> keySerializer = StringRedisSerializer.UTF_8;
private final TbRedisSerializer<K, V> valueSerializer;
@ -116,7 +117,7 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
@Override
public void evict(K key) {
try (var connection = connectionFactory.getConnection()) {
connection.del(getRawKey(key));
connection.keyCommands().del(getRawKey(key));
}
}
@ -127,7 +128,7 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
return;
}
try (var connection = connectionFactory.getConnection()) {
connection.del(keys.stream().map(this::getRawKey).toArray(byte[][]::new));
connection.keyCommands().del(keys.stream().map(this::getRawKey).toArray(byte[][]::new));
}
}
@ -135,10 +136,10 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
public void evictOrPut(K key, V value) {
try (var connection = connectionFactory.getConnection()) {
var rawKey = getRawKey(key);
var records = connection.del(rawKey);
var records = connection.keyCommands().del(rawKey);
if (records == null || records == 0) {
//We need to put the value in case of Redis, because evict will NOT cancel concurrent transaction used to "get" the missing value from cache.
connection.set(rawKey, getRawValue(value), evictExpiration, RedisStringCommands.SetOption.UPSERT);
connection.stringCommands().set(rawKey, getRawValue(value), evictExpiration, RedisStringCommands.SetOption.UPSERT);
}
}
}
@ -171,7 +172,7 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
return jedisConnection;
}
private RedisConnection watch(byte[][] rawKeysList) {
protected RedisConnection watch(byte[][] rawKeysList) {
RedisConnection connection = getConnection(rawKeysList[0]);
try {
connection.watch(rawKeysList);
@ -218,8 +219,12 @@ public abstract class RedisTbTransactionalCache<K extends Serializable, V extend
public void put(RedisConnection connection, K key, V value, RedisStringCommands.SetOption setOption) {
byte[] rawKey = getRawKey(key);
put(connection, rawKey, value, setOption);
}
public void put(RedisConnection connection, byte[] rawKey, V value, RedisStringCommands.SetOption setOption) {
byte[] rawValue = getRawValue(value);
connection.set(rawKey, rawValue, cacheTtl, setOption);
connection.stringCommands().set(rawKey, rawValue, this.cacheTtl, setOption);
}
}

View File

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.cache;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
public class TbJavaRedisSerializer<K, V> implements TbRedisSerializer<K, V> {
final RedisSerializer<Object> serializer = RedisSerializer.java();
@Override
public byte[] serialize(V value) throws SerializationException {
return serializer.serialize(value);
}
@Override
public V deserialize(K key, byte[] bytes) throws SerializationException {
return (V) serializer.deserialize(bytes);
}
}

View File

@ -0,0 +1,26 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.util;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME)
@ConditionalOnExpression("('${database.ts_latest.type}'=='sql' || '${database.ts_latest.type}'=='timescale') && '${cache.ts_latest.enabled:false}'=='true' && '${cache.type:caffeine}'=='redis' ")
public @interface SqlTsLatestAnyDaoCachedRedis {
}

View File

@ -36,6 +36,7 @@ public class CacheConstants {
public static final String ASSET_PROFILE_CACHE = "assetProfiles";
public static final String ATTRIBUTES_CACHE = "attributes";
public static final String TS_LATEST_CACHE = "tsLatest";
public static final String USERS_SESSION_INVALIDATION_CACHE = "userSessionsInvalidation";
public static final String OTA_PACKAGE_CACHE = "otaPackages";
public static final String OTA_PACKAGE_DATA_CACHE = "otaPackagesData";

View File

@ -216,6 +216,11 @@
<artifactId>jdbc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>

View File

@ -0,0 +1,181 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import org.thingsboard.server.cache.TbCacheValueWrapper;
import org.thingsboard.server.cache.TbTransactionalCache;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
import org.thingsboard.server.common.stats.DefaultCounter;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.cache.CacheExecutorService;
import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao;
import org.thingsboard.server.dao.timeseries.TsLatestCacheKey;
import org.thingsboard.server.dao.util.SqlTsLatestAnyDaoCachedRedis;
import java.util.List;
import java.util.Optional;
@Slf4j
@Component
@SqlTsLatestAnyDaoCachedRedis
@RequiredArgsConstructor
@Primary
public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao implements TimeseriesLatestDao {
public static final String STATS_NAME = "ts_latest.cache";
final CacheExecutorService cacheExecutorService;
final SqlTimeseriesLatestDao sqlDao;
final StatsFactory statsFactory;
final TbTransactionalCache<TsLatestCacheKey, TsKvEntry> cache;
DefaultCounter hitCounter;
DefaultCounter missCounter;
@PostConstruct
public void init() {
log.info("Init Redis cache-aside SQL Timeseries Latest DAO");
this.hitCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "hit");
this.missCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "miss");
}
@Override
public ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
ListenableFuture<Void> future = sqlDao.saveLatest(tenantId, entityId, tsKvEntry);
future = Futures.transform(future, x -> {
cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), tsKvEntry);
return x;
},
cacheExecutorService);
if (log.isTraceEnabled()) {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Void result) {
log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry);
}
@Override
public void onFailure(Throwable t) {
log.info("saveLatest onFailure [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry, t);
}
}, MoreExecutors.directExecutor());
}
return future;
}
@Override
public ListenableFuture<TsKvLatestRemovingResult> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
ListenableFuture<TsKvLatestRemovingResult> future = sqlDao.removeLatest(tenantId, entityId, query);
future = Futures.transform(future, x -> {
cache.evict(new TsLatestCacheKey(entityId, query.getKey()));
return x;
},
cacheExecutorService);
if (log.isTraceEnabled()) {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(TsKvLatestRemovingResult result) {
log.trace("removeLatest onSuccess [{}][{}][{}]", entityId, query.getKey(), query);
}
@Override
public void onFailure(Throwable t) {
log.info("removeLatest onFailure [{}][{}][{}]", entityId, query.getKey(), query, t);
}
}, MoreExecutors.directExecutor());
}
return future;
}
@Override
public ListenableFuture<Optional<TsKvEntry>> findLatestOpt(TenantId tenantId, EntityId entityId, String key) {
log.trace("findLatestOpt");
return doFindLatest(tenantId, entityId, key);
}
@Override
public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) {
return Futures.transform(doFindLatest(tenantId, entityId, key), x -> sqlDao.wrapNullTsKvEntry(key, x.orElse(null)), MoreExecutors.directExecutor());
}
public ListenableFuture<Optional<TsKvEntry>> doFindLatest(TenantId tenantId, EntityId entityId, String key) {
final TsLatestCacheKey cacheKey = new TsLatestCacheKey(entityId, key);
ListenableFuture<TbCacheValueWrapper<TsKvEntry>> cacheFuture = cacheExecutorService.submit(() -> cache.get(cacheKey));
return Futures.transformAsync(cacheFuture, (cacheValueWrap) -> {
if (cacheValueWrap != null) {
final TsKvEntry tsKvEntry = cacheValueWrap.get();
log.debug("findLatest cache hit [{}][{}][{}]", entityId, key, tsKvEntry);
return Futures.immediateFuture(Optional.ofNullable(tsKvEntry));
}
log.debug("findLatest cache miss [{}][{}]", entityId, key);
ListenableFuture<Optional<TsKvEntry>> daoFuture = sqlDao.findLatestOpt(tenantId,entityId, key);
return Futures.transformAsync(daoFuture, (daoValue) -> {
if (daoValue.isEmpty()) {
//TODO implement the cache logic if no latest found in TS DAO. Currently we are always getting from DB to stay on the safe side
return Futures.immediateFuture(daoValue);
}
ListenableFuture<Optional<TsKvEntry>> cachePutFuture = cacheExecutorService.submit(() -> {
cache.put(new TsLatestCacheKey(entityId, key), daoValue.get());
return daoValue;
});
Futures.addCallback(cachePutFuture, new FutureCallback<>() {
@Override
public void onSuccess(Optional<TsKvEntry> result) {
log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, key, result);
}
@Override
public void onFailure(Throwable t) {
log.info("saveLatest onFailure [{}][{}][{}]", entityId, key, daoValue, t);
}
}, MoreExecutors.directExecutor());
return cachePutFuture;
}, MoreExecutors.directExecutor());
}, MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
return sqlDao.findAllLatest(tenantId, entityId);
}
@Override
public List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) {
return sqlDao.findAllKeysByDeviceProfileId(tenantId, deviceProfileId);
}
@Override
public List<String> findAllKeysByEntityIds(TenantId tenantId, List<EntityId> entityIds) {
return sqlDao.findAllKeysByEntityIds(tenantId, entityIds);
}
}

View File

@ -157,12 +157,13 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
@Override
public ListenableFuture<Optional<TsKvEntry>> findLatestOpt(TenantId tenantId, EntityId entityId, String key) {
return service.submit(() -> Optional.ofNullable(doFindLatest(entityId, key)));
return service.submit(() -> Optional.ofNullable(doFindLatestSync(entityId, key)));
}
@Override
public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) {
return service.submit(() -> getLatestTsKvEntry(entityId, key));
log.trace("findLatest [{}][{}][{}]", tenantId, entityId, key);
return service.submit(() -> wrapNullTsKvEntry(key, doFindLatestSync(entityId, key)));
}
@Override
@ -206,7 +207,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
ReadTsKvQueryResult::getData, MoreExecutors.directExecutor());
}
protected TsKvEntry doFindLatest(EntityId entityId, String key) {
protected TsKvEntry doFindLatestSync(EntityId entityId, String key) {
TsKvLatestCompositeKey compositeKey =
new TsKvLatestCompositeKey(
entityId.getId(),
@ -222,7 +223,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
}
protected ListenableFuture<TsKvLatestRemovingResult> getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
ListenableFuture<TsKvEntry> latestFuture = service.submit(() -> doFindLatest(entityId, query.getKey()));
ListenableFuture<TsKvEntry> latestFuture = service.submit(() -> doFindLatestSync(entityId, query.getKey()));
return Futures.transformAsync(latestFuture, latest -> {
if (latest == null) {
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false));
@ -263,10 +264,9 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
return tsLatestQueue.add(latestEntity);
}
private TsKvEntry getLatestTsKvEntry(EntityId entityId, String key) {
TsKvEntry latest = doFindLatest(entityId, key);
protected TsKvEntry wrapNullTsKvEntry(final String key, final TsKvEntry latest) {
if (latest == null) {
latest = new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(key, null));
return new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(key, null));
}
return latest;
}

View File

@ -0,0 +1,40 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.timeseries;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.EntityId;
import java.io.Serial;
import java.io.Serializable;
@EqualsAndHashCode
@Getter
@AllArgsConstructor
public class TsLatestCacheKey implements Serializable {
private static final long serialVersionUID = 2024369077925351881L;
private final EntityId entityId;
private final String key;
@Override
public String toString() {
return "{" + entityId + "}" + key;
}
}

View File

@ -0,0 +1,148 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.timeseries;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Service;
import org.thingsboard.server.cache.CacheSpecsMap;
import org.thingsboard.server.cache.RedisTbTransactionalCache;
import org.thingsboard.server.cache.TBRedisCacheConfiguration;
import org.thingsboard.server.cache.TbCacheTransaction;
import org.thingsboard.server.cache.TbCacheValueWrapper;
import org.thingsboard.server.cache.TbJavaRedisSerializer;
import org.thingsboard.server.common.data.CacheConstants;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
@Service("TsLatestCache")
@Slf4j
public class TsLatestRedisCache<K extends Serializable, V extends Serializable> extends RedisTbTransactionalCache<TsLatestCacheKey, TsKvEntry> {
static final byte[] UPSERT_TS_LATEST_LUA_SCRIPT = StringRedisSerializer.UTF_8.serialize("" +
"redis.call('ZREMRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[1]); " +
"redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2]); " +
"local current_size = redis.call('ZCARD', KEYS[1]); " +
"if current_size > 1 then" +
" redis.call('ZREMRANGEBYRANK', KEYS[1], 0, -2) " +
"end;");
static final byte[] UPSERT_TS_LATEST_SHA = StringRedisSerializer.UTF_8.serialize("24e226c3ea34e3e850113e8eb1f3cd2b88171988");
public TsLatestRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) {
super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbJavaRedisSerializer<>());
}
@PostConstruct
public void init() {
try (var connection = getConnection(UPSERT_TS_LATEST_SHA)) {
log.debug("Loading LUA with expected SHA[{}], connection [{}]", new String(UPSERT_TS_LATEST_SHA), connection.getNativeConnection());
String sha = connection.scriptingCommands().scriptLoad(UPSERT_TS_LATEST_LUA_SCRIPT);
if (!Arrays.equals(UPSERT_TS_LATEST_SHA, StringRedisSerializer.UTF_8.serialize(sha))) {
log.error("SHA for UPSERT_TS_LATEST_LUA_SCRIPT wrong! Expected [{}], but actual [{}], connection [{}]", new String(UPSERT_TS_LATEST_SHA), sha, connection.getNativeConnection());
}
} catch (Throwable t) {
log.error("Error on Redis TS Latest cache init", t);
}
}
@Override
public TbCacheValueWrapper<TsKvEntry> get(TsLatestCacheKey key) {
log.debug("get [{}]", key);
return super.get(key);
}
@Override
protected byte[] doGet(RedisConnection connection, byte[] rawKey) {
log.trace("doGet [{}][{}]", connection, rawKey);
Set<byte[]> values = connection.commands().zRange(rawKey, -1, -1);
return values == null ? null : values.stream().findFirst().orElse(null);
}
@Override
public void put(TsLatestCacheKey key, TsKvEntry value) {
log.trace("put [{}][{}]", key, value);
final byte[] rawKey = getRawKey(key);
try (var connection = getConnection(rawKey)) {
byte[] rawValue = getRawValue(value);
byte[] ts = StringRedisSerializer.UTF_8.serialize(String.valueOf(value.toTsValue().getTs()));
try {
connection.scriptingCommands().evalSha(UPSERT_TS_LATEST_SHA, ReturnType.VALUE, 1, rawKey, ts, rawValue);
} catch (InvalidDataAccessApiUsageException e) {
log.debug("loading LUA [{}]", connection.getNativeConnection());
String sha = connection.scriptingCommands().scriptLoad(UPSERT_TS_LATEST_LUA_SCRIPT);
if (!Arrays.equals(UPSERT_TS_LATEST_SHA, StringRedisSerializer.UTF_8.serialize(sha))) {
log.error("SHA for UPSERT_TS_LATEST_LUA_SCRIPT wrong! Expected [{}], but actual [{}]", new String(UPSERT_TS_LATEST_SHA), sha);
}
try {
connection.scriptingCommands().evalSha(UPSERT_TS_LATEST_SHA, ReturnType.VALUE, 1, rawKey, ts, rawValue);
} catch (InvalidDataAccessApiUsageException ignored) {
log.debug("Slowly executing eval instead of fast evalsha");
connection.scriptingCommands().eval(UPSERT_TS_LATEST_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, ts, rawValue);
}
}
}
}
@Override
public void evict(TsLatestCacheKey key) {
log.trace("evict [{}]", key);
final byte[] rawKey = getRawKey(key);
try (var connection = getConnection(rawKey)) {
connection.keyCommands().del(rawKey);
}
}
@Override
public void putIfAbsent(TsLatestCacheKey key, TsKvEntry value) {
log.trace("putIfAbsent [{}][{}]", key, value);
throw new NotImplementedException("putIfAbsent is not supported by TsLatestRedisCache");
}
@Override
public void evict(Collection<TsLatestCacheKey> keys) {
throw new NotImplementedException("evict by many keys is not supported by TsLatestRedisCache");
}
@Override
public void evictOrPut(TsLatestCacheKey key, TsKvEntry value) {
throw new NotImplementedException("evictOrPut is not supported by TsLatestRedisCache");
}
@Override
public TbCacheTransaction<TsLatestCacheKey, TsKvEntry> newTransactionForKey(TsLatestCacheKey key) {
throw new NotImplementedException("newTransactionForKey is not supported by TsLatestRedisCache");
}
@Override
public TbCacheTransaction<TsLatestCacheKey, TsKvEntry> newTransactionForKeys(List<TsLatestCacheKey> keys) {
throw new NotImplementedException("newTransactionForKeys is not supported by TsLatestRedisCache");
}
}

View File

@ -0,0 +1,90 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao;
import lombok.extern.slf4j.Slf4j;
import org.junit.ClassRule;
import org.junit.rules.ExternalResource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.OutputFrame;
import redis.clients.jedis.Jedis;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Slf4j
public class AbstractRedisClusterContainer {
static final String nodes = "127.0.0.1:6371,127.0.0.1:6372,127.0.0.1:6373,127.0.0.1:6374,127.0.0.1:6375,127.0.0.1:6376";
@ClassRule(order = 0)
public static Network network = Network.newNetwork();
@ClassRule(order = 1)
public static GenericContainer redis1 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6371").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes);
@ClassRule(order = 2)
public static GenericContainer redis2 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6372").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes);
@ClassRule(order = 3)
public static GenericContainer redis3 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6373").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes);
@ClassRule(order = 4)
public static GenericContainer redis4 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6374").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes);
@ClassRule(order = 5)
public static GenericContainer redis5 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6375").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes);
@ClassRule(order = 6)
public static GenericContainer redis6 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6376").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes);
@ClassRule(order = 100)
public static ExternalResource resource = new ExternalResource() {
@Override
protected void before() throws Throwable {
redis1.start();
redis2.start();
redis3.start();
redis4.start();
redis5.start();
redis6.start();
Thread.sleep(TimeUnit.SECONDS.toMillis(5)); // otherwise not all containers have time to start
String clusterCreateCommand = "echo yes | redis-cli --cluster create " +
"127.0.0.1:6371 127.0.0.1:6372 127.0.0.1:6373 127.0.0.1:6374 127.0.0.1:6375 127.0.0.1:6376 " +
"--cluster-replicas 1";
log.warn("Command to init Redis Cluster: {}", clusterCreateCommand);
var result = redis6.execInContainer("/bin/sh", "-c", clusterCreateCommand);
log.warn("Init cluster result: {}", result);
log.warn("Connect to nodes: {}", nodes);
System.setProperty("cache.type", "redis");
System.setProperty("redis.connection.type", "cluster");
System.setProperty("redis.cluster.nodes", nodes);
System.setProperty("redis.cluster.useDefaultPoolConfig", "false");
}
@Override
protected void after() {
redis1.stop();
redis2.stop();
redis3.stop();
redis4.stop();
redis5.stop();
redis6.stop();
List.of("cache.type", "redis.connection.type", "redis.cluster.nodes", "redis.cluster.useDefaultPoolConfig\"")
.forEach(System.getProperties()::remove);
}
};
}

View File

@ -0,0 +1,31 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao;
import org.junit.extensions.cpsuite.ClasspathSuite;
import org.junit.extensions.cpsuite.ClasspathSuite.ClassnameFilters;
import org.junit.runner.RunWith;
@RunWith(ClasspathSuite.class)
@ClassnameFilters(
//All the same tests using redis instead of caffeine.
{
"org.thingsboard.server.dao.service.*ServiceSqlTest",
}
)
public class RedisClusterSqlTestSuite extends AbstractRedisClusterContainer {
}

View File

@ -0,0 +1,64 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
@Slf4j
public class RedisJUnit5Test {
@Container
private static final GenericContainer REDIS = new GenericContainer("redis:7.2-bookworm")
.withLogConsumer(s -> log.error(((OutputFrame) s).getUtf8String().trim()))
.withExposedPorts(6379);
@BeforeAll
static void beforeAll() {
log.warn("Starting redis...");
REDIS.start();
System.setProperty("cache.type", "redis");
System.setProperty("redis.connection.type", "standalone");
System.setProperty("redis.standalone.host", REDIS.getHost());
System.setProperty("redis.standalone.port", String.valueOf(REDIS.getMappedPort(6379)));
}
@AfterAll
static void afterAll() {
List.of("cache.type", "redis.connection.type", "redis.standalone.host", "redis.standalone.port")
.forEach(System.getProperties()::remove);
REDIS.stop();
log.warn("Redis is stopped");
}
@Test
void test() {
assertThat(REDIS.isRunning()).isTrue();
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.dao.service.timeseries;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.data.Offset;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -32,6 +33,7 @@ import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
@ -50,15 +52,14 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* @author Andrew Shvayka
@ -89,6 +90,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
KvEntry booleanKvEntry = new BooleanDataEntry(BOOLEAN_KEY, Boolean.TRUE);
protected TenantId tenantId;
DeviceId deviceId = new DeviceId(Uuids.timeBased());
@Before
public void before() {
@ -106,8 +108,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testFindAllLatest() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
saveEntries(deviceId, TS - 2);
saveEntries(deviceId, TS - 1);
saveEntries(deviceId, TS);
@ -150,8 +150,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testFindLatest() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
saveEntries(deviceId, TS - 2);
saveEntries(deviceId, TS - 1);
saveEntries(deviceId, TS);
@ -162,9 +160,71 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
}
@Test
public void testFindLatestWithoutLatestUpdate() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
public void testFindLatestOpt_givenSaveWithHistoricalNonOrderedTS() throws Exception {
save(tenantId, deviceId, toTsEntry(TS - 1, stringKvEntry));
save(tenantId, deviceId, toTsEntry(TS, stringKvEntry));
save(tenantId, deviceId, toTsEntry(TS - 10, stringKvEntry));
save(tenantId, deviceId, toTsEntry(TS - 11, stringKvEntry));
Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertThat(entryOpt).isNotNull().isPresent();
Assert.assertEquals(toTsEntry(TS, stringKvEntry), entryOpt.orElse(null));
}
@Test
public void testFindLatestOpt_givenSaveWithSameTSOverwriteValue() throws Exception {
save(tenantId, deviceId, toTsEntry(TS, new StringDataEntry(STRING_KEY, "old")));
save(tenantId, deviceId, toTsEntry(TS, new StringDataEntry(STRING_KEY, "new")));
Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertThat(entryOpt).isNotNull().isPresent();
Assert.assertEquals(toTsEntry(TS, new StringDataEntry(STRING_KEY, "new")), entryOpt.orElse(null));
}
public void testFindLatestOpt_givenSaveWithSameTSOverwriteTypeAndValue() throws Exception {
save(tenantId, deviceId, toTsEntry(TS, new JsonDataEntry("temp", "{\"hello\":\"world\"}")));
save(tenantId, deviceId, toTsEntry(TS, new BooleanDataEntry("temp", true)));
save(tenantId, deviceId, toTsEntry(TS, new LongDataEntry("temp", 100L)));
save(tenantId, deviceId, toTsEntry(TS, new DoubleDataEntry("temp", Math.PI)));
save(tenantId, deviceId, toTsEntry(TS, new StringDataEntry("temp", "NOOP")));
Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertThat(entryOpt).isNotNull().isPresent();
Assert.assertEquals(toTsEntry(TS, new StringDataEntry("temp", "NOOP")), entryOpt.orElse(null));
}
@Test
public void testFindLatestOpt() throws Exception {
saveEntries(deviceId, TS - 2);
saveEntries(deviceId, TS - 1);
saveEntries(deviceId, TS);
Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertThat(entryOpt).isNotNull().isPresent();
Assert.assertEquals(toTsEntry(TS, stringKvEntry), entryOpt.get());
}
@Test
public void testFindLatest_NotFound() throws Exception {
List<TsKvEntry> entries = tsService.findLatest(tenantId, deviceId, Collections.singleton(STRING_KEY)).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertThat(entries).hasSize(1);
TsKvEntry tsKvEntry = entries.get(0);
assertThat(tsKvEntry).isNotNull();
// null ts latest representation
assertThat(tsKvEntry.getKey()).isEqualTo(STRING_KEY);
assertThat(tsKvEntry.getDataType()).isEqualTo(DataType.STRING);
assertThat(tsKvEntry.getValue()).isNull();
assertThat(tsKvEntry.getTs()).isCloseTo(System.currentTimeMillis(), Offset.offset(TimeUnit.MINUTES.toMillis(1)));
}
@Test
public void testFindLatestOpt_NotFound() throws Exception {
Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertThat(entryOpt).isNotNull().isNotPresent();
}
@Test
public void testFindLatestWithoutLatestUpdate() throws Exception {
saveEntries(deviceId, TS - 2);
saveEntries(deviceId, TS - 1);
saveEntriesWithoutLatest(deviceId, TS);
@ -176,8 +236,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testFindByQueryAscOrder() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
saveEntries(deviceId, TS - 3);
saveEntries(deviceId, TS - 2);
saveEntries(deviceId, TS - 1);
@ -202,7 +260,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testFindByQuery_whenPeriodEqualsOneMilisecondPeriod() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
saveEntries(deviceId, TS - 1L);
saveEntries(deviceId, TS);
saveEntries(deviceId, TS + 1L);
@ -222,7 +279,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testFindByQuery_whenPeriodEqualsInterval() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
saveEntries(deviceId, TS - 1L);
for (long i = TS; i <= TS + 100L; i += 10L) {
saveEntries(deviceId, i);
@ -244,7 +300,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testFindByQuery_whenPeriodHaveTwoIntervalWithEqualsLength() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
saveEntries(deviceId, TS - 1L);
for (long i = TS; i <= TS + 100000L; i += 10000L) {
saveEntries(deviceId, i);
@ -268,7 +323,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testFindByQuery_whenPeriodHaveTwoInterval_whereSecondShorterThanFirst() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
saveEntries(deviceId, TS - 1L);
for (long i = TS; i <= TS + 80000L; i += 10000L) {
saveEntries(deviceId, i);
@ -292,7 +346,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testFindByQuery_whenPeriodHaveTwoIntervalWithEqualsLength_whereNotAllEntriesInRange() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
for (long i = TS - 1L; i <= TS + 100000L + 1L; i += 10000) {
saveEntries(deviceId, i);
}
@ -314,7 +367,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testFindByQuery_whenPeriodHaveTwoInterval_whereSecondShorterThanFirst_andNotAllEntriesInRange() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
for (long i = TS - 1L; i <= TS + 100000L + 1L; i += 10000L) {
saveEntries(deviceId, i);
}
@ -336,8 +388,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testFindByQueryDescOrder() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
saveEntries(deviceId, TS - 3);
saveEntries(deviceId, TS - 2);
saveEntries(deviceId, TS - 1);
@ -362,7 +412,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testFindAllByQueries_verifyQueryId() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
saveEntries(deviceId, TS);
saveEntries(deviceId, TS - 2);
saveEntries(deviceId, TS - 10);
@ -373,7 +422,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testFindAllByQueries_verifyQueryId_forEntityView() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
saveEntries(deviceId, TS);
saveEntries(deviceId, TS - 2);
saveEntries(deviceId, TS - 12);
@ -392,8 +440,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testDeleteDeviceTsDataWithOverwritingLatest() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
saveEntries(deviceId, 10000);
saveEntries(deviceId, 20000);
saveEntries(deviceId, 30000);
@ -412,7 +458,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testFindDeviceTsData() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
List<TsKvEntry> entries = new ArrayList<>();
entries.add(save(deviceId, 5000, 100));
@ -563,7 +608,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testFindDeviceLongAndDoubleTsData() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
List<TsKvEntry> entries = new ArrayList<>();
entries.add(save(deviceId, 5000, 100));
@ -654,8 +698,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
@Test
public void testSaveTs_RemoveTs_AndSaveTsAgain() throws Exception {
DeviceId deviceId = new DeviceId(Uuids.timeBased());
save(deviceId, 2000000L, 95);
save(deviceId, 4000000L, 100);
save(deviceId, 6000000L, 105);
@ -686,7 +728,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
BasicTsKvEntry jsonEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(5), new JsonDataEntry("test", "{\"test\":\"testValue\"}"));
List<TsKvEntry> timeseries = List.of(booleanEntry, stringEntry, longEntry, doubleEntry, jsonEntry);
DeviceId deviceId = new DeviceId(Uuids.timeBased());
for (TsKvEntry tsKvEntry : timeseries) {
save(tenantId, deviceId, tsKvEntry);
}

View File

@ -0,0 +1,45 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.timeseries;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import java.security.MessageDigest;
import static org.assertj.core.api.Assertions.assertThat;
class TsLatestRedisCacheTest {
@Test
void testUpsertTsLatestLUAScriptHash() {
assertThat(getSHA1(TsLatestRedisCache.UPSERT_TS_LATEST_LUA_SCRIPT)).isEqualTo(new String(TsLatestRedisCache.UPSERT_TS_LATEST_SHA));
}
@SneakyThrows
String getSHA1(byte[] script) {
MessageDigest md = MessageDigest.getInstance("SHA-1");
byte[] hash = md.digest(script);
StringBuilder sb = new StringBuilder();
for (byte b : hash) {
sb.append(String.format("%02x", b));
}
return sb.toString();
}
}

View File

@ -10,6 +10,7 @@ audit-log.sink.type=none
#cache.type=caffeine # will be injected redis by RedisContainer or will be default (caffeine)
cache.maximumPoolSize=16
cache.attributes.enabled=true
cache.ts_latest.enabled=true
cache.specs.relations.timeToLiveInMinutes=1440
cache.specs.relations.maxSize=100000
@ -59,6 +60,9 @@ cache.specs.assetProfiles.maxSize=100000
cache.specs.attributes.timeToLiveInMinutes=1440
cache.specs.attributes.maxSize=100000
cache.specs.tsLatest.timeToLiveInMinutes=1440
cache.specs.tsLatest.maxSize=100000
cache.specs.tokensOutdatageTime.timeToLiveInMinutes=1440
cache.specs.tokensOutdatageTime.maxSize=100000

View File

@ -9,6 +9,11 @@
<logger name="org.thingsboard.server.dao" level="WARN"/>
<logger name="org.testcontainers" level="INFO" />
<logger name="org.thingsboard.server.dao.sqlts" level="INFO" />
<logger name="org.thingsboard.server.dao.sqlts.CachedRedisSqlTimeseriesLatestDao" level="DEBUG" />
<logger name="org.thingsboard.server.dao.sqlts.SqlTimeseriesLatestDao" level="TRACE" />
<logger name="org.thingsboard.server.dao.timeseries.TsLatestRedisCache" level="TRACE" />
<!-- Log Hibernate SQL queries -->
<!-- <logger name="org.hibernate.SQL" level="DEBUG"/> -->

12
pom.xml
View File

@ -2162,6 +2162,18 @@
<version>${testcontainers-junit4-mock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.zeroturnaround</groupId>
<artifactId>zt-exec</artifactId>