Save time series strategies: dynamically calculate max number of deduplication intervals
This commit is contained in:
parent
a2095636a0
commit
349554f938
@ -104,7 +104,7 @@ public class TbMsgTimeseriesNode implements TbNode {
|
|||||||
}
|
}
|
||||||
long ts = computeTs(msg, config.isUseServerTs());
|
long ts = computeTs(msg, config.isUseServerTs());
|
||||||
|
|
||||||
TimeseriesSaveRequest.Strategy strategy = determineSaveActions(ts, msg.getOriginator().getId());
|
TimeseriesSaveRequest.Strategy strategy = determineSaveStrategy(ts, msg.getOriginator().getId());
|
||||||
|
|
||||||
// short-circuit
|
// short-circuit
|
||||||
if (!strategy.saveTimeseries() && !strategy.saveLatest() && !strategy.sendWsUpdate()) {
|
if (!strategy.saveTimeseries() && !strategy.saveLatest() && !strategy.sendWsUpdate()) {
|
||||||
@ -144,7 +144,7 @@ public class TbMsgTimeseriesNode implements TbNode {
|
|||||||
return ignoreMetadataTs ? System.currentTimeMillis() : msg.getMetaDataTs();
|
return ignoreMetadataTs ? System.currentTimeMillis() : msg.getMetaDataTs();
|
||||||
}
|
}
|
||||||
|
|
||||||
private TimeseriesSaveRequest.Strategy determineSaveActions(long ts, UUID originatorUuid) {
|
private TimeseriesSaveRequest.Strategy determineSaveStrategy(long ts, UUID originatorUuid) {
|
||||||
if (persistenceSettings instanceof OnEveryMessage) {
|
if (persistenceSettings instanceof OnEveryMessage) {
|
||||||
return TimeseriesSaveRequest.Strategy.SAVE_ALL;
|
return TimeseriesSaveRequest.Strategy.SAVE_ALL;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -30,6 +30,9 @@ final class DeduplicatePersistenceStrategy implements PersistenceStrategy {
|
|||||||
private static final int MIN_DEDUPLICATION_INTERVAL_SECS = 1;
|
private static final int MIN_DEDUPLICATION_INTERVAL_SECS = 1;
|
||||||
private static final int MAX_DEDUPLICATION_INTERVAL_SECS = (int) Duration.ofDays(1L).toSeconds();
|
private static final int MAX_DEDUPLICATION_INTERVAL_SECS = (int) Duration.ofDays(1L).toSeconds();
|
||||||
|
|
||||||
|
private static final int MAX_TOTAL_INTERVALS_DURATION_SECS = (int) Duration.ofDays(2L).toSeconds();
|
||||||
|
private static final int MAX_NUMBER_OF_INTERVALS = 100;
|
||||||
|
|
||||||
private final long deduplicationIntervalMillis;
|
private final long deduplicationIntervalMillis;
|
||||||
private final LoadingCache<Long, Set<UUID>> deduplicationCache;
|
private final LoadingCache<Long, Set<UUID>> deduplicationCache;
|
||||||
|
|
||||||
@ -43,10 +46,19 @@ final class DeduplicatePersistenceStrategy implements PersistenceStrategy {
|
|||||||
deduplicationCache = Caffeine.newBuilder()
|
deduplicationCache = Caffeine.newBuilder()
|
||||||
.softValues()
|
.softValues()
|
||||||
.expireAfterAccess(Duration.ofSeconds(deduplicationIntervalSecs * 10L))
|
.expireAfterAccess(Duration.ofSeconds(deduplicationIntervalSecs * 10L))
|
||||||
.maximumSize(20L)
|
.maximumSize(calculateMaxNumberOfDeduplicationIntervals(deduplicationIntervalSecs))
|
||||||
.build(__ -> Sets.newConcurrentHashSet());
|
.build(__ -> Sets.newConcurrentHashSet());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculates the maximum number of deduplication intervals we will store in the cache.
|
||||||
|
* We limit retention to two days to avoid stale data and cap it at 100 intervals to manage memory usage.
|
||||||
|
*/
|
||||||
|
private static long calculateMaxNumberOfDeduplicationIntervals(int deduplicationIntervalSecs) {
|
||||||
|
int numberOfDeduplicationIntervals = MAX_TOTAL_INTERVALS_DURATION_SECS / deduplicationIntervalSecs;
|
||||||
|
return Math.min(numberOfDeduplicationIntervals, MAX_NUMBER_OF_INTERVALS);
|
||||||
|
}
|
||||||
|
|
||||||
@JsonProperty("deduplicationIntervalSecs")
|
@JsonProperty("deduplicationIntervalSecs")
|
||||||
public long getDeduplicationIntervalSecs() {
|
public long getDeduplicationIntervalSecs() {
|
||||||
return Duration.ofMillis(deduplicationIntervalMillis).toSeconds();
|
return Duration.ofMillis(deduplicationIntervalMillis).toSeconds();
|
||||||
|
|||||||
@ -15,10 +15,14 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.rule.engine.telemetry.strategy;
|
package org.thingsboard.rule.engine.telemetry.strategy;
|
||||||
|
|
||||||
|
import com.github.benmanes.caffeine.cache.LoadingCache;
|
||||||
|
import com.github.benmanes.caffeine.cache.Policy;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.test.util.ReflectionTestUtils;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
@ -49,6 +53,57 @@ class DeduplicatePersistenceStrategyTest {
|
|||||||
.hasMessageContaining("Deduplication interval must be at least 1 second(s) and at most 86400 second(s), was 86401 second(s)");
|
.hasMessageContaining("Deduplication interval must be at least 1 second(s) and at most 86400 second(s), was 86401 second(s)");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shouldNotAllowMoreThan100DeduplicationIntervals() {
|
||||||
|
// GIVEN
|
||||||
|
int deduplicationIntervalSecs = 1; // min deduplication interval duration
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
|
||||||
|
|
||||||
|
assertThat(deduplicationCache.policy().eviction())
|
||||||
|
.isPresent()
|
||||||
|
.map(Policy.Eviction::getMaximum)
|
||||||
|
.hasValue(100L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shouldCalculateMaxIntervalsAsTwoDaysDividedByIntervalDuration() {
|
||||||
|
// GIVEN
|
||||||
|
int deduplicationIntervalSecs = (int) Duration.ofHours(1L).toSeconds();
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
|
||||||
|
|
||||||
|
assertThat(deduplicationCache.policy().eviction())
|
||||||
|
.isPresent()
|
||||||
|
.map(Policy.Eviction::getMaximum)
|
||||||
|
.hasValue(48L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shouldKeepAtLeastTwoDeduplicationIntervals() {
|
||||||
|
// GIVEN
|
||||||
|
int deduplicationIntervalSecs = (int) Duration.ofDays(1L).toSeconds(); // max deduplication interval duration
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
strategy = new DeduplicatePersistenceStrategy(deduplicationIntervalSecs);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
var deduplicationCache = (LoadingCache<Long, Set<UUID>>) ReflectionTestUtils.getField(strategy, "deduplicationCache");
|
||||||
|
|
||||||
|
assertThat(deduplicationCache.policy().eviction())
|
||||||
|
.isPresent()
|
||||||
|
.map(Policy.Eviction::getMaximum)
|
||||||
|
.hasValue(2L);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void shouldReturnTrueForFirstCallInInterval() {
|
void shouldReturnTrueForFirstCallInInterval() {
|
||||||
long ts = 1_000_000L;
|
long ts = 1_000_000L;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user