From d9b77251133551e4b8f933fe0b9a268d06730877 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 26 Mar 2024 14:24:18 +0100 Subject: [PATCH 01/16] WIP Redis junit5 test container --- dao/pom.xml | 5 ++ .../server/dao/RedisJUnit5Test.java | 64 +++++++++++++++++++ pom.xml | 12 ++++ 3 files changed, 81 insertions(+) create mode 100644 dao/src/test/java/org/thingsboard/server/dao/RedisJUnit5Test.java diff --git a/dao/pom.xml b/dao/pom.xml index ab62d8f013..700d2b0135 100644 --- a/dao/pom.xml +++ b/dao/pom.xml @@ -216,6 +216,11 @@ jdbc test + + org.testcontainers + junit-jupiter + test + org.springframework spring-context-support diff --git a/dao/src/test/java/org/thingsboard/server/dao/RedisJUnit5Test.java b/dao/src/test/java/org/thingsboard/server/dao/RedisJUnit5Test.java new file mode 100644 index 0000000000..43e788cccb --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/RedisJUnit5Test.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.OutputFrame; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +@Slf4j +public class RedisJUnit5Test { + + @Container + private static final GenericContainer REDIS = new GenericContainer("redis:7.2-bookworm") + .withLogConsumer(s -> log.error(((OutputFrame) s).getUtf8String().trim())) + .withExposedPorts(6379); + + @BeforeAll + static void beforeAll() { + log.warn("Starting redis..."); + REDIS.start(); + System.setProperty("cache.type", "redis"); + System.setProperty("redis.connection.type", "standalone"); + System.setProperty("redis.standalone.host", REDIS.getHost()); + System.setProperty("redis.standalone.port", String.valueOf(REDIS.getMappedPort(6379))); + + } + + @AfterAll + static void afterAll() { + List.of("cache.type", "redis.connection.type", "redis.standalone.host", "redis.standalone.port") + .forEach(System.getProperties()::remove); + REDIS.stop(); + log.warn("Redis is stopped"); + } + + @Test + void test() { + assertThat(REDIS.isRunning()).isTrue(); + } + +} diff --git a/pom.xml b/pom.xml index d2450e0a65..70a35d77d2 100755 --- a/pom.xml +++ b/pom.xml @@ -2021,6 +2021,18 @@ + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + junit + junit + + + org.zeroturnaround zt-exec From 3a86913e24068f020ed7f274c09bd483b6cd39d5 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 3 Apr 2024 10:05:01 +0200 Subject: [PATCH 02/16] RedisTbTransactionalCache refactored deprecated methods --- .../cache/RedisTbTransactionalCache.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java index abfbb398c9..cd5db13995 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java @@ -29,7 +29,6 @@ import org.springframework.data.redis.core.types.Expiration; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.thingsboard.server.common.data.FstStatsService; -import redis.clients.jedis.Connection; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.util.JedisClusterCRC16; @@ -79,7 +78,7 @@ public abstract class RedisTbTransactionalCache get(K key) { try (var connection = connectionFactory.getConnection()) { byte[] rawKey = getRawKey(key); - byte[] rawValue = connection.get(rawKey); + byte[] rawValue = connection.stringCommands().get(rawKey); if (rawValue == null) { return null; } else if (Arrays.equals(rawValue, BINARY_NULL_VALUE)) { @@ -113,7 +112,7 @@ public abstract class RedisTbTransactionalCache(this, connection); } - private RedisConnection getConnection(byte[] rawKey) { + protected RedisConnection getConnection(byte[] rawKey) { if (!connectionFactory.isRedisClusterAware()) { return connectionFactory.getConnection(); } @@ -168,7 +167,7 @@ public abstract class RedisTbTransactionalCache Date: Wed, 3 Apr 2024 11:46:03 +0200 Subject: [PATCH 03/16] WIP CachedRedisSqlTimeseriesLatestDao.java --- .../src/main/resources/thingsboard.yml | 7 ++ build.sh | 57 ++++++++++ .../server/cache/TbJavaRedisSerializer.java | 35 ++++++ .../util/SqlTsLatestAnyDaoCachedRedis.java | 26 +++++ .../server/common/data/CacheConstants.java | 1 + .../CachedRedisSqlTimeseriesLatestDao.java | 104 ++++++++++++++++++ .../dao/timeseries/TsLatestCacheKey.java | 40 +++++++ .../dao/timeseries/TsLatestRedisCache.java | 35 ++++++ .../resources/application-test.properties | 4 + dao/src/test/resources/logback.xml | 1 + 10 files changed, 310 insertions(+) create mode 100755 build.sh create mode 100644 common/cache/src/main/java/org/thingsboard/server/cache/TbJavaRedisSerializer.java create mode 100644 common/dao-api/src/main/java/org/thingsboard/server/dao/util/SqlTsLatestAnyDaoCachedRedis.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestCacheKey.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index daf9f5d1c1..535a396762 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -489,6 +489,10 @@ cache: attributes: # make sure that if cache.type is 'redis' and cache.attributes.enabled is 'true' if you change 'maxmemory-policy' Redis config property to 'allkeys-lru', 'allkeys-lfu' or 'allkeys-random' enabled: "${CACHE_ATTRIBUTES_ENABLED:true}" + ts_latest: + # Will enable cache-aside strategy for SQL timeseries latest DAO. + # make sure that if cache.type is 'redis' and cache.ts_latest.enabled is 'true' if you change 'maxmemory-policy' Redis config property to 'allkeys-lru', 'allkeys-lfu' or 'allkeys-random' + enabled: "${CACHE_TS_LATEST_ENABLED:true}" specs: relations: timeToLiveInMinutes: "${CACHE_SPECS_RELATIONS_TTL:1440}" # Relations cache TTL @@ -539,6 +543,9 @@ cache: attributes: timeToLiveInMinutes: "${CACHE_SPECS_ATTRIBUTES_TTL:1440}" # Attributes cache TTL maxSize: "${CACHE_SPECS_ATTRIBUTES_MAX_SIZE:100000}" # 0 means the cache is disabled + tsLatest: + timeToLiveInMinutes: "${CACHE_SPECS_TS_LATEST_TTL:1440}" # Timeseries latest cache TTL + maxSize: "${CACHE_SPECS_TS_LATEST_MAX_SIZE:100000}" # 0 means the cache is disabled userSessionsInvalidation: # The value of this TTL is ignored and replaced by the JWT refresh token expiration time timeToLiveInMinutes: "0" diff --git a/build.sh b/build.sh new file mode 100755 index 0000000000..2c6a7d23fa --- /dev/null +++ b/build.sh @@ -0,0 +1,57 @@ +#!/bin/bash +# +# Copyright © 2016-2024 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -e # exit on any error + +#PROJECTS="msa/tb-node,msa/web-ui,rule-engine-pe/rule-node-twilio-sms" +PROJECTS="" + +if [ "$1" ]; then + PROJECTS="--projects $1" +fi + +echo "Building and pushing [amd64,arm64] projects '$PROJECTS' ..." +echo "HELP: usage ./build.sh [projects]" +echo "HELP: example ./build.sh msa/web-ui,msa/web-report" +java -version +#echo "Cleaning ui-ngx/node_modules" && rm -rf ui-ngx/node_modules + +MAVEN_OPTS="-Xmx1024m" NODE_OPTIONS="--max_old_space_size=4096" DOCKER_CLI_EXPERIMENTAL=enabled DOCKER_BUILDKIT=0 \ +mvn -T2 license:format clean install -DskipTests \ + $PROJECTS --also-make +# \ +# -Dpush-docker-amd-arm-images +# -Ddockerfile.skip=false -Dpush-docker-image=true +# --offline +# --projects '!msa/web-report' --also-make + +# push all +# mvn -T 1C license:format clean install -DskipTests -Ddockerfile.skip=false -Dpush-docker-image=true + + +## Build and push AMD and ARM docker images using docker buildx +## Reference to article how to setup docker miltiplatform build environment: https://medium.com/@artur.klauser/building-multi-architecture-docker-images-with-buildx-27d80f7e2408 +## install docker-ce from docker repo https://docs.docker.com/engine/install/ubuntu/ +# sudo apt install -y qemu-user-static binfmt-support +# export DOCKER_CLI_EXPERIMENTAL=enabled +# docker version +# docker run --rm --privileged multiarch/qemu-user-static --reset -p yes +# docker buildx create --name mybuilder +# docker buildx use mybuilder +# docker buildx inspect --bootstrap +# docker buildx ls +# mvn clean install -P push-docker-amd-arm-images \ No newline at end of file diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/TbJavaRedisSerializer.java b/common/cache/src/main/java/org/thingsboard/server/cache/TbJavaRedisSerializer.java new file mode 100644 index 0000000000..92c9f37900 --- /dev/null +++ b/common/cache/src/main/java/org/thingsboard/server/cache/TbJavaRedisSerializer.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.cache; + +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.serializer.SerializationException; + +public class TbJavaRedisSerializer implements TbRedisSerializer { + + final RedisSerializer serializer = RedisSerializer.java(); + + @Override + public byte[] serialize(V value) throws SerializationException { + return serializer.serialize(value); + } + + @Override + public V deserialize(K key, byte[] bytes) throws SerializationException { + return (V) serializer.deserialize(bytes); + } + +} diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/util/SqlTsLatestAnyDaoCachedRedis.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/util/SqlTsLatestAnyDaoCachedRedis.java new file mode 100644 index 0000000000..634302a1f3 --- /dev/null +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/util/SqlTsLatestAnyDaoCachedRedis.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +@Retention(RetentionPolicy.RUNTIME) +@ConditionalOnExpression("('${database.ts_latest.type}'=='sql' || '${database.ts_latest.type}'=='timescale') && '${cache.ts_latest.enabled:false}'=='true' && '${cache.type:caffeine}'=='redis' ") +public @interface SqlTsLatestAnyDaoCachedRedis { +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java index 9fda1c6bb5..50e02fca0a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java @@ -34,6 +34,7 @@ public class CacheConstants { public static final String ASSET_PROFILE_CACHE = "assetProfiles"; public static final String ATTRIBUTES_CACHE = "attributes"; + public static final String TS_LATEST_CACHE = "tsLatest"; public static final String USERS_SESSION_INVALIDATION_CACHE = "userSessionsInvalidation"; public static final String OTA_PACKAGE_CACHE = "otaPackages"; public static final String OTA_PACKAGE_DATA_CACHE = "otaPackagesData"; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java new file mode 100644 index 0000000000..6b32808f7d --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java @@ -0,0 +1,104 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sqlts; + +import com.google.common.util.concurrent.ListenableFuture; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Component; +import org.thingsboard.server.cache.TbTransactionalCache; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; +import org.thingsboard.server.common.stats.DefaultCounter; +import org.thingsboard.server.common.stats.StatsFactory; +import org.thingsboard.server.dao.cache.CacheExecutorService; +import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao; +import org.thingsboard.server.dao.timeseries.TsLatestCacheKey; +import org.thingsboard.server.dao.util.SqlTsLatestAnyDaoCachedRedis; + +import java.util.List; +import java.util.Optional; + +@Slf4j +@Component +@SqlTsLatestAnyDaoCachedRedis +@RequiredArgsConstructor +@Primary +public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao implements TimeseriesLatestDao { + public static final String STATS_NAME = "ts_latest.cache"; + + DefaultCounter hitCounter; + DefaultCounter missCounter; + + final CacheExecutorService cacheExecutorService; + final SqlTimeseriesLatestDao sqlDao; + final StatsFactory statsFactory; + final TbTransactionalCache cache; + + @PostConstruct + public void init() { + log.info("Init Redis cache-aside SQL Timeseries Latest DAO"); + this.hitCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "hit"); + this.missCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "miss"); + } + + @Override + public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { + return sqlDao.saveLatest(tenantId, entityId, tsKvEntry); + } + + @Override + public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + return sqlDao.removeLatest(tenantId, entityId, query); + } + + @Override + public ListenableFuture> findLatestOpt(TenantId tenantId, EntityId entityId, String key) { + return sqlDao.findLatestOpt(tenantId, entityId, key); + } + + @Override + public ListenableFuture findLatest(TenantId tenantId, EntityId entityId, String key) { + return sqlDao.findLatest(tenantId, entityId, key); + } + + @Override + public TsKvEntry findLatestSync(TenantId tenantId, EntityId entityId, String key) { + return sqlDao.findLatestSync(tenantId, entityId, key); + } + + @Override + public ListenableFuture> findAllLatest(TenantId tenantId, EntityId entityId) { + return sqlDao.findAllLatest(tenantId, entityId); + } + + @Override + public List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) { + return sqlDao.findAllKeysByDeviceProfileId(tenantId, deviceProfileId); + } + + @Override + public List findAllKeysByEntityIds(TenantId tenantId, List entityIds) { + return sqlDao.findAllKeysByEntityIds(tenantId, entityIds); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestCacheKey.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestCacheKey.java new file mode 100644 index 0000000000..adb572922a --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestCacheKey.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.id.EntityId; + +import java.io.Serial; +import java.io.Serializable; + +@EqualsAndHashCode +@Getter +@AllArgsConstructor +public class TsLatestCacheKey implements Serializable { + private static final long serialVersionUID = 2024369077925351881L; + + private final EntityId entityId; + private final String key; + + @Override + public String toString() { + return "{" + entityId + "}" + key; + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java new file mode 100644 index 0000000000..d67e9272e3 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.timeseries; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.stereotype.Service; +import org.thingsboard.server.cache.CacheSpecsMap; +import org.thingsboard.server.cache.RedisTbTransactionalCache; +import org.thingsboard.server.cache.TBRedisCacheConfiguration; +import org.thingsboard.server.cache.TbJavaRedisSerializer; +import org.thingsboard.server.common.data.CacheConstants; +import org.thingsboard.server.common.data.kv.TsKvEntry; + +@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") +@Service("TsLatestCache") +public class TsLatestRedisCache extends RedisTbTransactionalCache { + + public TsLatestRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) { + super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbJavaRedisSerializer<>()); } + +} diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties index 73c1495af8..c34d0ad592 100644 --- a/dao/src/test/resources/application-test.properties +++ b/dao/src/test/resources/application-test.properties @@ -10,6 +10,7 @@ audit-log.sink.type=none #cache.type=caffeine # will be injected redis by RedisContainer or will be default (caffeine) cache.maximumPoolSize=16 cache.attributes.enabled=true +cache.ts_latest.enabled=true cache.specs.relations.timeToLiveInMinutes=1440 cache.specs.relations.maxSize=100000 @@ -53,6 +54,9 @@ cache.specs.assetProfiles.maxSize=100000 cache.specs.attributes.timeToLiveInMinutes=1440 cache.specs.attributes.maxSize=100000 +cache.specs.tsLatest.timeToLiveInMinutes=1440 +cache.specs.tsLatest.maxSize=100000 + cache.specs.tokensOutdatageTime.timeToLiveInMinutes=1440 cache.specs.tokensOutdatageTime.maxSize=100000 diff --git a/dao/src/test/resources/logback.xml b/dao/src/test/resources/logback.xml index 5e293b2982..8d4951658e 100644 --- a/dao/src/test/resources/logback.xml +++ b/dao/src/test/resources/logback.xml @@ -9,6 +9,7 @@ + From 44096a7cb7059c1f3d847d5af736dbe4c4c4a7d5 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 8 Apr 2024 14:57:20 +0200 Subject: [PATCH 04/16] TS latest dao put to Redis WIP --- .../cache/RedisTbTransactionalCache.java | 6 ++++- .../CachedRedisSqlTimeseriesLatestDao.java | 22 ++++++++++++++++++- .../dao/timeseries/TsLatestRedisCache.java | 19 ++++++++++++++-- .../{logback.xml => logback-test.xml} | 2 ++ 4 files changed, 45 insertions(+), 4 deletions(-) rename dao/src/test/resources/{logback.xml => logback-test.xml} (87%) diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java index cd5db13995..bb5af26c3e 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java @@ -214,8 +214,12 @@ public abstract class RedisTbTransactionalCache saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { - return sqlDao.saveLatest(tenantId, entityId, tsKvEntry); + ListenableFuture future = sqlDao.saveLatest(tenantId, entityId, tsKvEntry); + future = Futures.transform(future, x -> { + cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), tsKvEntry); + return x; + }, + cacheExecutorService); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Void result) { + log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry); + } + + @Override + public void onFailure(Throwable t) { + log.trace("saveLatest onFailure [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry, t); + } + }, MoreExecutors.directExecutor()); + return future; } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java index d67e9272e3..c0bca607c9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java @@ -15,8 +15,10 @@ */ package org.thingsboard.server.dao.timeseries; +import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.stereotype.Service; import org.thingsboard.server.cache.CacheSpecsMap; import org.thingsboard.server.cache.RedisTbTransactionalCache; @@ -25,11 +27,24 @@ import org.thingsboard.server.cache.TbJavaRedisSerializer; import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.kv.TsKvEntry; +import java.io.Serializable; + @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") @Service("TsLatestCache") -public class TsLatestRedisCache extends RedisTbTransactionalCache { +@Slf4j +public class TsLatestRedisCache extends RedisTbTransactionalCache { public TsLatestRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) { - super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbJavaRedisSerializer<>()); } + super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbJavaRedisSerializer<>()); + } + + @Override + public void put(TsLatestCacheKey key, TsKvEntry value) { + log.trace("put [{}][{}]", key, value); + final byte[] rawKey = getRawKey(key); + try (var connection = getConnection(rawKey)) { + put(connection, rawKey, value, RedisStringCommands.SetOption.UPSERT); + } + } } diff --git a/dao/src/test/resources/logback.xml b/dao/src/test/resources/logback-test.xml similarity index 87% rename from dao/src/test/resources/logback.xml rename to dao/src/test/resources/logback-test.xml index 8d4951658e..d7cff4b67b 100644 --- a/dao/src/test/resources/logback.xml +++ b/dao/src/test/resources/logback-test.xml @@ -10,6 +10,8 @@ + + From 3003fccf3b12f4f390b6e7860acbe1b5fc43db3b Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 16 Apr 2024 16:43:20 +0200 Subject: [PATCH 05/16] BaseTimeseriesServiceTest refactored --- .../timeseries/BaseTimeseriesServiceTest.java | 31 +++---------------- 1 file changed, 4 insertions(+), 27 deletions(-) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index 390cb9afac..58b28872be 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.service.timeseries; import com.datastax.oss.driver.api.core.uuid.Uuids; import lombok.extern.slf4j.Slf4j; +import org.assertj.core.data.Offset; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -32,6 +33,7 @@ import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DataType; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.JsonDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; @@ -50,15 +52,14 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; /** * @author Andrew Shvayka @@ -89,6 +90,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { KvEntry booleanKvEntry = new BooleanDataEntry(BOOLEAN_KEY, Boolean.TRUE); protected TenantId tenantId; + DeviceId deviceId = new DeviceId(Uuids.timeBased()); @Before public void before() { @@ -106,8 +108,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindAllLatest() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 1); saveEntries(deviceId, TS); @@ -150,8 +150,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindLatest() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 1); saveEntries(deviceId, TS); @@ -163,8 +161,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindLatestWithoutLatestUpdate() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 1); saveEntriesWithoutLatest(deviceId, TS); @@ -176,8 +172,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQueryAscOrder() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - saveEntries(deviceId, TS - 3); saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 1); @@ -202,7 +196,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQuery_whenPeriodEqualsOneMilisecondPeriod() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); saveEntries(deviceId, TS - 1L); saveEntries(deviceId, TS); saveEntries(deviceId, TS + 1L); @@ -222,7 +215,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQuery_whenPeriodEqualsInterval() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); saveEntries(deviceId, TS - 1L); for (long i = TS; i <= TS + 100L; i += 10L) { saveEntries(deviceId, i); @@ -244,7 +236,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQuery_whenPeriodHaveTwoIntervalWithEqualsLength() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); saveEntries(deviceId, TS - 1L); for (long i = TS; i <= TS + 100000L; i += 10000L) { saveEntries(deviceId, i); @@ -268,7 +259,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQuery_whenPeriodHaveTwoInterval_whereSecondShorterThanFirst() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); saveEntries(deviceId, TS - 1L); for (long i = TS; i <= TS + 80000L; i += 10000L) { saveEntries(deviceId, i); @@ -292,7 +282,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQuery_whenPeriodHaveTwoIntervalWithEqualsLength_whereNotAllEntriesInRange() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); for (long i = TS - 1L; i <= TS + 100000L + 1L; i += 10000) { saveEntries(deviceId, i); } @@ -314,7 +303,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQuery_whenPeriodHaveTwoInterval_whereSecondShorterThanFirst_andNotAllEntriesInRange() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); for (long i = TS - 1L; i <= TS + 100000L + 1L; i += 10000L) { saveEntries(deviceId, i); } @@ -336,8 +324,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindByQueryDescOrder() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - saveEntries(deviceId, TS - 3); saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 1); @@ -362,7 +348,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindAllByQueries_verifyQueryId() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); saveEntries(deviceId, TS); saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 10); @@ -373,7 +358,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindAllByQueries_verifyQueryId_forEntityView() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); saveEntries(deviceId, TS); saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 12); @@ -392,8 +376,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testDeleteDeviceTsDataWithOverwritingLatest() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - saveEntries(deviceId, 10000); saveEntries(deviceId, 20000); saveEntries(deviceId, 30000); @@ -412,7 +394,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindDeviceTsData() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); List entries = new ArrayList<>(); entries.add(save(deviceId, 5000, 100)); @@ -563,7 +544,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testFindDeviceLongAndDoubleTsData() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); List entries = new ArrayList<>(); entries.add(save(deviceId, 5000, 100)); @@ -654,8 +634,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { @Test public void testSaveTs_RemoveTs_AndSaveTsAgain() throws Exception { - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - save(deviceId, 2000000L, 95); save(deviceId, 4000000L, 100); save(deviceId, 6000000L, 105); @@ -686,7 +664,6 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { BasicTsKvEntry jsonEntry = new BasicTsKvEntry(TimeUnit.MINUTES.toMillis(5), new JsonDataEntry("test", "{\"test\":\"testValue\"}")); List timeseries = List.of(booleanEntry, stringEntry, longEntry, doubleEntry, jsonEntry); - DeviceId deviceId = new DeviceId(Uuids.timeBased()); for (TsKvEntry tsKvEntry : timeseries) { save(tenantId, deviceId, tsKvEntry); } From 771d15a6ddc71d1e95511c17c3b03ad006e750c3 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 16 Apr 2024 16:43:47 +0200 Subject: [PATCH 06/16] BaseTimeseriesServiceTest: 3 findLatest related tests added --- .../timeseries/BaseTimeseriesServiceTest.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index 58b28872be..83db90ad71 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -159,6 +159,36 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0)); } + @Test + public void testFindLatestOpt() throws Exception { + saveEntries(deviceId, TS - 2); + saveEntries(deviceId, TS - 1); + saveEntries(deviceId, TS); + + Optional entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(entryOpt).isNotNull().isPresent(); + Assert.assertEquals(toTsEntry(TS, stringKvEntry), entryOpt.get()); + } + + @Test + public void testFindLatest_NotFound() throws Exception { + List entries = tsService.findLatest(tenantId, deviceId, Collections.singleton(STRING_KEY)).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(entries).hasSize(1); + TsKvEntry tsKvEntry = entries.get(0); + assertThat(tsKvEntry).isNotNull(); + // null ts latest representation + assertThat(tsKvEntry.getKey()).isEqualTo(STRING_KEY); + assertThat(tsKvEntry.getDataType()).isEqualTo(DataType.STRING); + assertThat(tsKvEntry.getValue()).isNull(); + assertThat(tsKvEntry.getTs()).isCloseTo(System.currentTimeMillis(), Offset.offset(TimeUnit.MINUTES.toMillis(1))); + } + + @Test + public void testFindLatestOpt_NotFound() throws Exception { + Optional entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(entryOpt).isNotNull().isNotPresent(); + } + @Test public void testFindLatestWithoutLatestUpdate() throws Exception { saveEntries(deviceId, TS - 2); From 034b480a6ca641f9b2670c146b0a78e8190c72be Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 16 Apr 2024 16:47:27 +0200 Subject: [PATCH 07/16] RedisTbTransactionalCache refactored to make doGet using the rawKey and the Redis connection precalculated slot based on hash --- .../thingsboard/server/cache/RedisTbTransactionalCache.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java index bb5af26c3e..35ef07f0ce 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java @@ -78,7 +78,7 @@ public abstract class RedisTbTransactionalCache get(K key) { try (var connection = connectionFactory.getConnection()) { byte[] rawKey = getRawKey(key); - byte[] rawValue = connection.stringCommands().get(rawKey); + byte[] rawValue = doGet(connection, rawKey); if (rawValue == null) { return null; } else if (Arrays.equals(rawValue, BINARY_NULL_VALUE)) { @@ -95,6 +95,10 @@ public abstract class RedisTbTransactionalCache Date: Tue, 16 Apr 2024 16:50:36 +0200 Subject: [PATCH 08/16] SqlTimeseriesLatestDao refactored to use a common doFindLatestSync for any find request --- .../server/dao/sqlts/SqlTimeseriesLatestDao.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index a9fb85560a..0f773e83d9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -155,17 +155,18 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme @Override public ListenableFuture> findLatestOpt(TenantId tenantId, EntityId entityId, String key) { - return service.submit(() -> Optional.ofNullable(doFindLatest(entityId, key))); + return service.submit(() -> Optional.ofNullable(doFindLatestSync(entityId, key))); } @Override public ListenableFuture findLatest(TenantId tenantId, EntityId entityId, String key) { - return service.submit(() -> getLatestTsKvEntry(entityId, key)); + log.trace("findLatest [{}][{}][{}]", tenantId, entityId, key); + return service.submit(() -> wrapNullTsKvEntry(key, doFindLatestSync(entityId, key))); } @Override public TsKvEntry findLatestSync(TenantId tenantId, EntityId entityId, String key) { - return getLatestTsKvEntry(entityId, key); + return wrapNullTsKvEntry(key, doFindLatestSync(entityId, key)); } @Override @@ -209,7 +210,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme ReadTsKvQueryResult::getData, MoreExecutors.directExecutor()); } - protected TsKvEntry doFindLatest(EntityId entityId, String key) { + protected TsKvEntry doFindLatestSync(EntityId entityId, String key) { TsKvLatestCompositeKey compositeKey = new TsKvLatestCompositeKey( entityId.getId(), @@ -225,7 +226,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme } protected ListenableFuture getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - ListenableFuture latestFuture = service.submit(() -> doFindLatest(entityId, query.getKey())); + ListenableFuture latestFuture = service.submit(() -> doFindLatestSync(entityId, query.getKey())); return Futures.transformAsync(latestFuture, latest -> { if (latest == null) { return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false)); @@ -266,10 +267,9 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return tsLatestQueue.add(latestEntity); } - private TsKvEntry getLatestTsKvEntry(EntityId entityId, String key) { - TsKvEntry latest = doFindLatest(entityId, key); + protected TsKvEntry wrapNullTsKvEntry(final String key, final TsKvEntry latest) { if (latest == null) { - latest = new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(key, null)); + return new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(key, null)); } return latest; } From 551325b8c7afe8afa58ed2586f6d3e6258b0894e Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 16 Apr 2024 16:52:31 +0200 Subject: [PATCH 09/16] added CachedRedisSqlTimeseriesLatestDao and TsLatestRedisCache (prototype impl with string commands). Caffeine cache not implemented yet --- .../CachedRedisSqlTimeseriesLatestDao.java | 97 +++++++++++++++---- .../dao/timeseries/TsLatestRedisCache.java | 53 ++++++++++ 2 files changed, 133 insertions(+), 17 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java index b283832186..482b456e60 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java @@ -24,6 +24,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; +import org.thingsboard.server.cache.TbCacheValueWrapper; import org.thingsboard.server.cache.TbTransactionalCache; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; @@ -48,14 +49,12 @@ import java.util.Optional; @Primary public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao implements TimeseriesLatestDao { public static final String STATS_NAME = "ts_latest.cache"; - - DefaultCounter hitCounter; - DefaultCounter missCounter; - final CacheExecutorService cacheExecutorService; final SqlTimeseriesLatestDao sqlDao; final StatsFactory statsFactory; final TbTransactionalCache cache; + DefaultCounter hitCounter; + DefaultCounter missCounter; @PostConstruct public void init() { @@ -72,37 +71,101 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries return x; }, cacheExecutorService); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(Void result) { - log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry); - } + if (log.isTraceEnabled()) { + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(Void result) { + log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry); + } - @Override - public void onFailure(Throwable t) { - log.trace("saveLatest onFailure [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry, t); - } - }, MoreExecutors.directExecutor()); + @Override + public void onFailure(Throwable t) { + log.info("saveLatest onFailure [{}][{}][{}]", entityId, tsKvEntry.getKey(), tsKvEntry, t); + } + }, MoreExecutors.directExecutor()); + } return future; } @Override public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - return sqlDao.removeLatest(tenantId, entityId, query); + ListenableFuture future = sqlDao.removeLatest(tenantId, entityId, query); + future = Futures.transform(future, x -> { + cache.evict(new TsLatestCacheKey(entityId, query.getKey())); + return x; + }, + cacheExecutorService); + if (log.isTraceEnabled()) { + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(TsKvLatestRemovingResult result) { + log.trace("removeLatest onSuccess [{}][{}][{}]", entityId, query.getKey(), query); + } + + @Override + public void onFailure(Throwable t) { + log.info("removeLatest onFailure [{}][{}][{}]", entityId, query.getKey(), query, t); + } + }, MoreExecutors.directExecutor()); + } + return future; } @Override public ListenableFuture> findLatestOpt(TenantId tenantId, EntityId entityId, String key) { - return sqlDao.findLatestOpt(tenantId, entityId, key); + log.trace("findLatestOpt"); + return doFindLatest(tenantId, entityId, key); } @Override public ListenableFuture findLatest(TenantId tenantId, EntityId entityId, String key) { - return sqlDao.findLatest(tenantId, entityId, key); + return Futures.transform(doFindLatest(tenantId, entityId, key), x -> sqlDao.wrapNullTsKvEntry(key, x.orElse(null)), MoreExecutors.directExecutor()); + } + + public ListenableFuture> doFindLatest(TenantId tenantId, EntityId entityId, String key) { + final TsLatestCacheKey cacheKey = new TsLatestCacheKey(entityId, key); + ListenableFuture> cacheFuture = cacheExecutorService.submit(() -> cache.get(cacheKey)); + + return Futures.transformAsync(cacheFuture, (cacheValueWrap) -> { + if (cacheValueWrap != null) { + final TsKvEntry tsKvEntry = cacheValueWrap.get(); + log.debug("findLatest cache hit [{}][{}][{}]", entityId, key, tsKvEntry); + return Futures.immediateFuture(Optional.ofNullable(tsKvEntry)); + } + log.debug("findLatest cache miss [{}][{}]", entityId, key); + ListenableFuture> daoFuture = sqlDao.findLatestOpt(tenantId,entityId, key); + + return Futures.transformAsync(daoFuture, (daoValue) -> { + + if (daoValue.isEmpty()) { + //TODO implement the cache logic if no latest found in TS DAO. Currently we are always getting from DB to stay on the safe side + return Futures.immediateFuture(daoValue); + } + ListenableFuture> cachePutFuture = cacheExecutorService.submit(() -> { + cache.put(new TsLatestCacheKey(entityId, key), daoValue.get()); + return daoValue; + }); + + Futures.addCallback(cachePutFuture, new FutureCallback<>() { + @Override + public void onSuccess(Optional result) { + log.trace("saveLatest onSuccess [{}][{}][{}]", entityId, key, result); + } + + @Override + public void onFailure(Throwable t) { + log.info("saveLatest onFailure [{}][{}][{}]", entityId, key, daoValue, t); + } + + }, MoreExecutors.directExecutor()); + return cachePutFuture; + }, MoreExecutors.directExecutor()); + }, MoreExecutors.directExecutor()); } @Override public TsKvEntry findLatestSync(TenantId tenantId, EntityId entityId, String key) { + log.trace("findLatestSync DEPRECATED [{}][{}]", entityId, key); return sqlDao.findLatestSync(tenantId, entityId, key); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java index c0bca607c9..61d2c8bdb2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java @@ -16,18 +16,24 @@ package org.thingsboard.server.dao.timeseries; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.NotImplementedException; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.stereotype.Service; import org.thingsboard.server.cache.CacheSpecsMap; import org.thingsboard.server.cache.RedisTbTransactionalCache; import org.thingsboard.server.cache.TBRedisCacheConfiguration; +import org.thingsboard.server.cache.TbCacheTransaction; +import org.thingsboard.server.cache.TbCacheValueWrapper; import org.thingsboard.server.cache.TbJavaRedisSerializer; import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.io.Serializable; +import java.util.Collection; +import java.util.List; @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") @Service("TsLatestCache") @@ -47,4 +53,51 @@ public class TsLatestRedisCache } } + @Override + public void putIfAbsent(TsLatestCacheKey key, TsKvEntry value) { + log.trace("putIfAbsent [{}][{}]", key, value); + throw new NotImplementedException("putIfAbsent is not supported by TsLatestRedisCache"); + } + + @Override + public TbCacheValueWrapper get(TsLatestCacheKey key) { + log.debug("get [{}]", key); + return super.get(key); + } + + @Override + protected byte[] doGet(RedisConnection connection, byte[] rawKey) { + log.trace("doGet [{}][{}]", connection, rawKey); + return connection.stringCommands().get(rawKey); + } + + @Override + public void evict(TsLatestCacheKey key){ + log.trace("evict [{}]", key); + final byte[] rawKey = getRawKey(key); + try (var connection = getConnection(rawKey)) { + connection.keyCommands().del(rawKey); + } + } + + @Override + public void evict(Collection keys) { + throw new NotImplementedException("evict by many keys is not supported by TsLatestRedisCache"); + } + + @Override + public void evictOrPut(TsLatestCacheKey key, TsKvEntry value) { + throw new NotImplementedException("evictOrPut is not supported by TsLatestRedisCache"); + } + + @Override + public TbCacheTransaction newTransactionForKey(TsLatestCacheKey key) { + throw new NotImplementedException("newTransactionForKey is not supported by TsLatestRedisCache"); + } + + @Override + public TbCacheTransaction newTransactionForKeys(List keys) { + throw new NotImplementedException("newTransactionForKeys is not supported by TsLatestRedisCache"); + } + } From c49d97f7d19d69883a789b2ac2b687f6c34f322a Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Tue, 16 Apr 2024 17:35:04 +0200 Subject: [PATCH 10/16] BaseTimeseriesServiceTest added overwrite TS cases and historical data load case, that fails without condition on update only if ts is the latest --- .../timeseries/BaseTimeseriesServiceTest.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index 83db90ad71..d32933684a 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -159,6 +159,40 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0)); } + @Test + public void testFindLatestOpt_givenSaveWithHistoricalNonOrderedTS() throws Exception { + save(tenantId, deviceId, toTsEntry(TS - 1, stringKvEntry)); + save(tenantId, deviceId, toTsEntry(TS, stringKvEntry)); + save(tenantId, deviceId, toTsEntry(TS - 10, stringKvEntry)); + save(tenantId, deviceId, toTsEntry(TS - 11, stringKvEntry)); + + Optional entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(entryOpt).isNotNull().isPresent(); + Assert.assertEquals(toTsEntry(TS, stringKvEntry), entryOpt.orElse(null)); + } + + @Test + public void testFindLatestOpt_givenSaveWithSameTSOverwriteValue() throws Exception { + save(tenantId, deviceId, toTsEntry(TS, new StringDataEntry(STRING_KEY, "old"))); + save(tenantId, deviceId, toTsEntry(TS, new StringDataEntry(STRING_KEY, "new"))); + + Optional entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(entryOpt).isNotNull().isPresent(); + Assert.assertEquals(toTsEntry(TS, new StringDataEntry(STRING_KEY, "new")), entryOpt.orElse(null)); + } + + public void testFindLatestOpt_givenSaveWithSameTSOverwriteTypeAndValue() throws Exception { + save(tenantId, deviceId, toTsEntry(TS, new JsonDataEntry("temp", "{\"hello\":\"world\"}"))); + save(tenantId, deviceId, toTsEntry(TS, new BooleanDataEntry("temp", true))); + save(tenantId, deviceId, toTsEntry(TS, new LongDataEntry("temp", 100L))); + save(tenantId, deviceId, toTsEntry(TS, new DoubleDataEntry("temp", Math.PI))); + save(tenantId, deviceId, toTsEntry(TS, new StringDataEntry("temp", "NOOP"))); + + Optional entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); + assertThat(entryOpt).isNotNull().isPresent(); + Assert.assertEquals(toTsEntry(TS, new StringDataEntry("temp", "NOOP")), entryOpt.orElse(null)); + } + @Test public void testFindLatestOpt() throws Exception { saveEntries(deviceId, TS - 2); From a4469f69539ead14b1c926ce103cfdb704741823 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 17 Apr 2024 12:14:42 +0200 Subject: [PATCH 11/16] TsLatestRedisCache: added LUA upsert script. load script, eval, evalsha, test for script sha, fetch latest by zRange Redis command --- .../cache/RedisTbTransactionalCache.java | 1 + .../dao/timeseries/TsLatestRedisCache.java | 75 +++++++++++++++---- .../timeseries/TsLatestRedisCacheTest.java | 30 ++++++++ 3 files changed, 91 insertions(+), 15 deletions(-) create mode 100644 dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java index 35ef07f0ce..4277a7bab9 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbTransactionalCache.java @@ -51,6 +51,7 @@ public abstract class RedisTbTransactionalCache keySerializer = StringRedisSerializer.UTF_8; private final TbRedisSerializer valueSerializer; diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java index 61d2c8bdb2..e9f9ab522c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java @@ -15,12 +15,15 @@ */ package org.thingsboard.server.dao.timeseries; +import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.NotImplementedException; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.connection.RedisStringCommands; +import org.springframework.data.redis.connection.ReturnType; +import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.stereotype.Service; import org.thingsboard.server.cache.CacheSpecsMap; import org.thingsboard.server.cache.RedisTbTransactionalCache; @@ -32,33 +35,42 @@ import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.io.Serializable; +import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Set; @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") @Service("TsLatestCache") @Slf4j public class TsLatestRedisCache extends RedisTbTransactionalCache { + static final byte[] UPSERT_TS_LATEST_LUA_SCRIPT = StringRedisSerializer.UTF_8.serialize("" + + "redis.call('ZREMRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[1]); " + + "redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2]); " + + "local current_size = redis.call('ZCARD', KEYS[1]); " + + "if current_size > 1 then" + + " redis.call('ZREMRANGEBYRANK', KEYS[1], 0, -2) " + + "end;"); + static final byte[] UPSERT_TS_LATEST_SHA = StringRedisSerializer.UTF_8.serialize("24e226c3ea34e3e850113e8eb1f3cd2b88171988"); + public TsLatestRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) { super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbJavaRedisSerializer<>()); } - @Override - public void put(TsLatestCacheKey key, TsKvEntry value) { - log.trace("put [{}][{}]", key, value); - final byte[] rawKey = getRawKey(key); - try (var connection = getConnection(rawKey)) { - put(connection, rawKey, value, RedisStringCommands.SetOption.UPSERT); + @PostConstruct + public void init() { + try (var connection = getConnection(UPSERT_TS_LATEST_SHA)) { + log.debug("Loading LUA with expected SHA[{}], connection [{}]", new String(UPSERT_TS_LATEST_SHA), connection.getNativeConnection()); + String sha = connection.scriptingCommands().scriptLoad(UPSERT_TS_LATEST_LUA_SCRIPT); + if (!Arrays.equals(UPSERT_TS_LATEST_SHA, StringRedisSerializer.UTF_8.serialize(sha))) { + log.error("SHA for UPSERT_TS_LATEST_LUA_SCRIPT wrong! Expected [{}], but actual [{}], connection [{}]", new String(UPSERT_TS_LATEST_SHA), sha, connection.getNativeConnection()); + } + } catch (Throwable t) { + log.error("Error on Redis TS Latest cache init", t); } } - @Override - public void putIfAbsent(TsLatestCacheKey key, TsKvEntry value) { - log.trace("putIfAbsent [{}][{}]", key, value); - throw new NotImplementedException("putIfAbsent is not supported by TsLatestRedisCache"); - } - @Override public TbCacheValueWrapper get(TsLatestCacheKey key) { log.debug("get [{}]", key); @@ -68,11 +80,38 @@ public class TsLatestRedisCache @Override protected byte[] doGet(RedisConnection connection, byte[] rawKey) { log.trace("doGet [{}][{}]", connection, rawKey); - return connection.stringCommands().get(rawKey); + Set values = connection.commands().zRange(rawKey, -1, -1); + return values == null ? null : values.stream().findFirst().orElse(null); } @Override - public void evict(TsLatestCacheKey key){ + public void put(TsLatestCacheKey key, TsKvEntry value) { + log.trace("put [{}][{}]", key, value); + final byte[] rawKey = getRawKey(key); + try (var connection = getConnection(rawKey)) { + byte[] rawValue = getRawValue(value); + byte[] ts = StringRedisSerializer.UTF_8.serialize(String.valueOf(value.toTsValue().getTs())); + try { + connection.scriptingCommands().evalSha(UPSERT_TS_LATEST_SHA, ReturnType.VALUE, 1, rawKey, ts, rawValue); + } catch (InvalidDataAccessApiUsageException e) { + log.debug("loading LUA [{}]", connection.getNativeConnection()); + String sha = connection.scriptingCommands().scriptLoad(UPSERT_TS_LATEST_LUA_SCRIPT); + if (!Arrays.equals(UPSERT_TS_LATEST_SHA, StringRedisSerializer.UTF_8.serialize(sha))) { + log.error("SHA for UPSERT_TS_LATEST_LUA_SCRIPT wrong! Expected [{}], but actual [{}]", new String(UPSERT_TS_LATEST_SHA), sha); + } + try { + connection.scriptingCommands().evalSha(UPSERT_TS_LATEST_SHA, ReturnType.VALUE, 1, rawKey, ts, rawValue); + } catch (InvalidDataAccessApiUsageException ignored) { + log.debug("Slowly executing eval instead of fast evalsha"); + connection.scriptingCommands().eval(UPSERT_TS_LATEST_LUA_SCRIPT, ReturnType.VALUE, 1, rawKey, ts, rawValue); + } + + } + } + } + + @Override + public void evict(TsLatestCacheKey key) { log.trace("evict [{}]", key); final byte[] rawKey = getRawKey(key); try (var connection = getConnection(rawKey)) { @@ -80,6 +119,12 @@ public class TsLatestRedisCache } } + @Override + public void putIfAbsent(TsLatestCacheKey key, TsKvEntry value) { + log.trace("putIfAbsent [{}][{}]", key, value); + throw new NotImplementedException("putIfAbsent is not supported by TsLatestRedisCache"); + } + @Override public void evict(Collection keys) { throw new NotImplementedException("evict by many keys is not supported by TsLatestRedisCache"); diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java new file mode 100644 index 0000000000..5de26c15d3 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java @@ -0,0 +1,30 @@ +package org.thingsboard.server.dao.timeseries; + +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; + +import java.security.MessageDigest; + +import static org.assertj.core.api.Assertions.assertThat; + +class TsLatestRedisCacheTest { + + @Test + void testUpsertTsLatestLUAScriptHash() { + assertThat(getSHA1(TsLatestRedisCache.UPSERT_TS_LATEST_LUA_SCRIPT)).isEqualTo(new String(TsLatestRedisCache.UPSERT_TS_LATEST_SHA)); + } + + @SneakyThrows + String getSHA1(byte[] script) { + MessageDigest md = MessageDigest.getInstance("SHA-1"); + byte[] hash = md.digest(script); + + StringBuilder sb = new StringBuilder(); + for (byte b : hash) { + sb.append(String.format("%02x", b)); + } + + return sb.toString(); + } + +} \ No newline at end of file From 67c2392cccea99861bb1f869d4dce55f58a81f3a Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 17 Apr 2024 12:15:22 +0200 Subject: [PATCH 12/16] RedisClusterSqlTestSuite introduced --- .../dao/AbstractRedisClusterContainer.java | 92 +++++++++++++++++++ .../server/dao/RedisClusterSqlTestSuite.java | 31 +++++++ 2 files changed, 123 insertions(+) create mode 100644 dao/src/test/java/org/thingsboard/server/dao/AbstractRedisClusterContainer.java create mode 100644 dao/src/test/java/org/thingsboard/server/dao/RedisClusterSqlTestSuite.java diff --git a/dao/src/test/java/org/thingsboard/server/dao/AbstractRedisClusterContainer.java b/dao/src/test/java/org/thingsboard/server/dao/AbstractRedisClusterContainer.java new file mode 100644 index 0000000000..cfa4c00ddf --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/AbstractRedisClusterContainer.java @@ -0,0 +1,92 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao; + +import lombok.extern.slf4j.Slf4j; +import org.junit.ClassRule; +import org.junit.rules.ExternalResource; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.OutputFrame; +import redis.clients.jedis.Jedis; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class AbstractRedisClusterContainer { + + static final String nodes = "127.0.0.1:6371,127.0.0.1:6372,127.0.0.1:6373,127.0.0.1:6374,127.0.0.1:6375,127.0.0.1:6376"; + + @ClassRule(order = 0) + public static Network network = Network.newNetwork(); + @ClassRule(order = 1) + public static GenericContainer redis1 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6371").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes); + @ClassRule(order = 2) + public static GenericContainer redis2 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6372").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes); + @ClassRule(order = 3) + public static GenericContainer redis3 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6373").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes); + @ClassRule(order = 4) + public static GenericContainer redis4 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6374").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes); + @ClassRule(order = 5) + public static GenericContainer redis5 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6375").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes); + @ClassRule(order = 6) + public static GenericContainer redis6 = new GenericContainer("bitnami/redis-cluster:latest").withEnv("REDIS_PORT_NUMBER","6376").withNetworkMode("host").withLogConsumer(x->log.warn("{}", ((OutputFrame)x).getUtf8StringWithoutLineEnding())).withEnv("ALLOW_EMPTY_PASSWORD","yes").withEnv("REDIS_NODES",nodes); + + + @ClassRule(order = 100) + public static ExternalResource resource = new ExternalResource() { + @Override + protected void before() throws Throwable { + redis1.start(); + redis2.start(); + redis3.start(); + redis4.start(); + redis5.start(); + redis6.start(); + + String clusterCreateCommand = "echo yes | redis-cli --cluster create " + + "127.0.0.1:6371 127.0.0.1:6372 127.0.0.1:6373 127.0.0.1:6374 127.0.0.1:6375 127.0.0.1:6376 " + + "--cluster-replicas 1"; + + log.warn("Command to init Redis Cluster: {}", clusterCreateCommand); + log.warn("Connect to nodes : {}", nodes); + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); + var result = redis6.execInContainer("/bin/sh", "-c", clusterCreateCommand); + log.warn("Init cluster result: {}", result); + + System.setProperty("cache.type", "redis"); + System.setProperty("redis.connection.type", "cluster"); + System.setProperty("redis.cluster.nodes", nodes); + System.setProperty("redis.password", ""); + + System.setProperty("redis.cluster.useDefaultPoolConfig", "false"); + } + + @Override + protected void after() { + redis1.stop(); + redis2.stop(); + redis3.stop(); + redis4.stop(); + redis5.stop(); + redis6.stop(); + List.of("cache.type", "redis.connection.type", "redis.standalone.host", "redis.standalone.port") + .forEach(System.getProperties()::remove); + } + }; + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/RedisClusterSqlTestSuite.java b/dao/src/test/java/org/thingsboard/server/dao/RedisClusterSqlTestSuite.java new file mode 100644 index 0000000000..be0bcc6fc7 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/RedisClusterSqlTestSuite.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao; + +import org.junit.extensions.cpsuite.ClasspathSuite; +import org.junit.extensions.cpsuite.ClasspathSuite.ClassnameFilters; +import org.junit.runner.RunWith; + +@RunWith(ClasspathSuite.class) +@ClassnameFilters( + //All the same tests using redis instead of caffeine. + { + "org.thingsboard.server.dao.service.*ServiceSqlTest", + } +) +public class RedisClusterSqlTestSuite extends AbstractRedisClusterContainer { + +} From 743b8b1d801d751fd9fc55e31f5014853a670c5a Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 17 Apr 2024 12:16:31 +0200 Subject: [PATCH 13/16] logback-test temporary added TRACE for SqlTimeseriesLatestDao and CachedRedisSqlTimeseriesLatestDao --- dao/src/test/resources/logback-test.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dao/src/test/resources/logback-test.xml b/dao/src/test/resources/logback-test.xml index d7cff4b67b..87aa67a67e 100644 --- a/dao/src/test/resources/logback-test.xml +++ b/dao/src/test/resources/logback-test.xml @@ -10,6 +10,8 @@ + + From 168525d3b9e00636e979b36b75538ffb5ea41853 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 17 Apr 2024 12:24:16 +0200 Subject: [PATCH 14/16] mvn license:format --- .../dao/timeseries/TsLatestRedisCacheTest.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java index 5de26c15d3..c463b4630f 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCacheTest.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.dao.timeseries; import lombok.SneakyThrows; From 92793360f304687cd48f8dbf5a95834efcda53a1 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 17 Apr 2024 12:39:54 +0200 Subject: [PATCH 15/16] CachedRedisSqlTimeseriesLatestDao.java removed deprecated findLatestSync --- .../server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java index 482b456e60..32d0ee91c9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java @@ -163,12 +163,6 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries }, MoreExecutors.directExecutor()); } - @Override - public TsKvEntry findLatestSync(TenantId tenantId, EntityId entityId, String key) { - log.trace("findLatestSync DEPRECATED [{}][{}]", entityId, key); - return sqlDao.findLatestSync(tenantId, entityId, key); - } - @Override public ListenableFuture> findAllLatest(TenantId tenantId, EntityId entityId) { return sqlDao.findAllLatest(tenantId, entityId); From 20ec210c320648a190d39ad40a78c08a6d4ce172 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 17 Apr 2024 12:43:14 +0200 Subject: [PATCH 16/16] AbstractRedisClusterContainer refactored --- .../server/dao/AbstractRedisClusterContainer.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/dao/src/test/java/org/thingsboard/server/dao/AbstractRedisClusterContainer.java b/dao/src/test/java/org/thingsboard/server/dao/AbstractRedisClusterContainer.java index cfa4c00ddf..a95f4b8c17 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/AbstractRedisClusterContainer.java +++ b/dao/src/test/java/org/thingsboard/server/dao/AbstractRedisClusterContainer.java @@ -58,21 +58,19 @@ public class AbstractRedisClusterContainer { redis5.start(); redis6.start(); + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); // otherwise not all containers have time to start + String clusterCreateCommand = "echo yes | redis-cli --cluster create " + "127.0.0.1:6371 127.0.0.1:6372 127.0.0.1:6373 127.0.0.1:6374 127.0.0.1:6375 127.0.0.1:6376 " + "--cluster-replicas 1"; - log.warn("Command to init Redis Cluster: {}", clusterCreateCommand); - log.warn("Connect to nodes : {}", nodes); - Thread.sleep(TimeUnit.SECONDS.toMillis(5)); var result = redis6.execInContainer("/bin/sh", "-c", clusterCreateCommand); log.warn("Init cluster result: {}", result); + log.warn("Connect to nodes: {}", nodes); System.setProperty("cache.type", "redis"); System.setProperty("redis.connection.type", "cluster"); System.setProperty("redis.cluster.nodes", nodes); - System.setProperty("redis.password", ""); - System.setProperty("redis.cluster.useDefaultPoolConfig", "false"); } @@ -84,7 +82,7 @@ public class AbstractRedisClusterContainer { redis4.stop(); redis5.stop(); redis6.stop(); - List.of("cache.type", "redis.connection.type", "redis.standalone.host", "redis.standalone.port") + List.of("cache.type", "redis.connection.type", "redis.cluster.nodes", "redis.cluster.useDefaultPoolConfig\"") .forEach(System.getProperties()::remove); } };