Save time series strategies: dynamically calculate interval expire after access
This commit is contained in:
parent
349554f938
commit
0131358d3b
@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||||
import com.github.benmanes.caffeine.cache.LoadingCache;
|
import com.github.benmanes.caffeine.cache.LoadingCache;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
import com.google.common.primitives.Longs;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
@ -30,6 +31,10 @@ final class DeduplicatePersistenceStrategy implements PersistenceStrategy {
|
|||||||
private static final int MIN_DEDUPLICATION_INTERVAL_SECS = 1;
|
private static final int MIN_DEDUPLICATION_INTERVAL_SECS = 1;
|
||||||
private static final int MAX_DEDUPLICATION_INTERVAL_SECS = (int) Duration.ofDays(1L).toSeconds();
|
private static final int MAX_DEDUPLICATION_INTERVAL_SECS = (int) Duration.ofDays(1L).toSeconds();
|
||||||
|
|
||||||
|
private static final long MIN_INTERVAL_EXPIRY_MILLIS = Duration.ofMinutes(10L).toMillis();
|
||||||
|
private static final int INTERVAL_EXPIRY_FACTOR = 10;
|
||||||
|
private static final long MAX_INTERVAL_EXPIRY_MILLIS = Duration.ofDays(2L).toMillis();
|
||||||
|
|
||||||
private static final int MAX_TOTAL_INTERVALS_DURATION_SECS = (int) Duration.ofDays(2L).toSeconds();
|
private static final int MAX_TOTAL_INTERVALS_DURATION_SECS = (int) Duration.ofDays(2L).toSeconds();
|
||||||
private static final int MAX_NUMBER_OF_INTERVALS = 100;
|
private static final int MAX_NUMBER_OF_INTERVALS = 100;
|
||||||
|
|
||||||
@ -45,11 +50,22 @@ final class DeduplicatePersistenceStrategy implements PersistenceStrategy {
|
|||||||
deduplicationIntervalMillis = Duration.ofSeconds(deduplicationIntervalSecs).toMillis();
|
deduplicationIntervalMillis = Duration.ofSeconds(deduplicationIntervalSecs).toMillis();
|
||||||
deduplicationCache = Caffeine.newBuilder()
|
deduplicationCache = Caffeine.newBuilder()
|
||||||
.softValues()
|
.softValues()
|
||||||
.expireAfterAccess(Duration.ofSeconds(deduplicationIntervalSecs * 10L))
|
.expireAfterAccess(calculateExpireAfterAccess(deduplicationIntervalSecs))
|
||||||
.maximumSize(calculateMaxNumberOfDeduplicationIntervals(deduplicationIntervalSecs))
|
.maximumSize(calculateMaxNumberOfDeduplicationIntervals(deduplicationIntervalSecs))
|
||||||
.build(__ -> Sets.newConcurrentHashSet());
|
.build(__ -> Sets.newConcurrentHashSet());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculates the expire-after-access duration. By default, we keep each deduplication interval
|
||||||
|
* alive for 10 “iterations” (interval duration × 10). However, we never let this drop below
|
||||||
|
* 10 minutes to ensure adequate retention for small intervals, nor exceed 48 hours to prevent
|
||||||
|
* storing stale data in memory.
|
||||||
|
*/
|
||||||
|
private static Duration calculateExpireAfterAccess(int deduplicationIntervalSecs) {
|
||||||
|
long desiredExpiryMillis = Duration.ofSeconds(deduplicationIntervalSecs).toMillis() * INTERVAL_EXPIRY_FACTOR;
|
||||||
|
return Duration.ofMillis(Longs.constrainToRange(desiredExpiryMillis, MIN_INTERVAL_EXPIRY_MILLIS, MAX_INTERVAL_EXPIRY_MILLIS));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calculates the maximum number of deduplication intervals we will store in the cache.
|
* Calculates the maximum number of deduplication intervals we will store in the cache.
|
||||||
* We limit retention to two days to avoid stale data and cap it at 100 intervals to manage memory usage.
|
* We limit retention to two days to avoid stale data and cap it at 100 intervals to manage memory usage.
|
||||||
|
|||||||
@ -53,6 +53,57 @@ class DeduplicatePersistenceStrategyTest {
|
|||||||
.hasMessageContaining("Deduplication interval must be at least 1 second(s) and at most 86400 second(s), was 86401 second(s)");
|
.hasMessageContaining("Deduplication interval must be at least 1 second(s) and at most 86400 second(s), was 86401 second(s)");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shouldUseAtLeastTenMinutesForExpireAfterAccess() {
|
||||||
|
// GIVEN
|
||||||
|
int deduplicationIntervalSecs = 1; // min deduplication interval duration
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
|
||||||
|
|
||||||
|
assertThat(deduplicationCache.policy().expireAfterAccess())
|
||||||
|
.isPresent()
|
||||||
|
.map(Policy.FixedExpiration::getExpiresAfter)
|
||||||
|
.hasValue(Duration.ofMinutes(10L));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shouldCalculateExpireAfterAccessAsIntervalDurationMultipliedByTen() {
|
||||||
|
// GIVEN
|
||||||
|
int deduplicationIntervalSecs = (int) Duration.ofHours(1L).toSeconds(); // max deduplication interval duration
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
|
||||||
|
|
||||||
|
assertThat(deduplicationCache.policy().expireAfterAccess())
|
||||||
|
.isPresent()
|
||||||
|
.map(Policy.FixedExpiration::getExpiresAfter)
|
||||||
|
.hasValue(Duration.ofHours(10L));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shouldUseAtMostTwoDaysForExpireAfterAccess() {
|
||||||
|
// GIVEN
|
||||||
|
int deduplicationIntervalSecs = (int) Duration.ofDays(1L).toSeconds(); // max deduplication interval duration
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
|
||||||
|
|
||||||
|
assertThat(deduplicationCache.policy().expireAfterAccess())
|
||||||
|
.isPresent()
|
||||||
|
.map(Policy.FixedExpiration::getExpiresAfter)
|
||||||
|
.hasValue(Duration.ofDays(2L));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void shouldNotAllowMoreThan100DeduplicationIntervals() {
|
void shouldNotAllowMoreThan100DeduplicationIntervals() {
|
||||||
// GIVEN
|
// GIVEN
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user