From 46a58ca82bb17c4434de39c52e34203dfd8fd417 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 3 Jun 2025 12:47:55 +0300 Subject: [PATCH] Edqs - VersionStore - Use local cache instead of caffeine to reduce memory heap size --- .../server/edqs/processor/EdqsProcessor.java | 1 + .../server/edqs/util/VersionsStore.java | 47 +++++++++++++++---- 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index 510d2c3a41..0e74cb98fa 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -277,6 +277,7 @@ public class EdqsProcessor implements TbQueueHandler, eventConsumer.awaitStop(); responseTemplate.stop(); stateService.stop(); + versionsStore.shutdown(); } } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java index ba3263eec2..9d4c67c4c2 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java @@ -15,31 +15,35 @@ */ package org.thingsboard.server.edqs.util; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.edqs.EdqsObjectKey; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @Slf4j public class VersionsStore { - private final Cache versions; + private final ConcurrentMap> versions = new ConcurrentHashMap<>(); + private final long expirationMillis; + private final ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor(); public VersionsStore(int ttlMinutes) { - this.versions = Caffeine.newBuilder() - .expireAfterWrite(ttlMinutes, TimeUnit.MINUTES) - .build(); + this.expirationMillis = TimeUnit.MINUTES.toMillis(ttlMinutes); + startCleanupTask(); } public boolean isNew(EdqsObjectKey key, Long version) { AtomicBoolean isNew = new AtomicBoolean(false); - versions.asMap().compute(key, (k, prevVersion) -> { - if (prevVersion == null || prevVersion <= version) { + versions.compute(key, (k, prevVersion) -> { + if (prevVersion == null || prevVersion.value <= version) { isNew.set(true); - return version; + return new TimedValue<>(version); } else { log.debug("[{}] Version {} is outdated, the latest is {}", key, version, prevVersion); return prevVersion; @@ -48,4 +52,29 @@ public class VersionsStore { return isNew.get(); } + private void startCleanupTask() { + cleaner.scheduleAtFixedRate(() -> { + long now = System.currentTimeMillis(); + for (Map.Entry> entry : versions.entrySet()) { + if (now - entry.getValue().lastUpdated > expirationMillis) { + versions.remove(entry.getKey(), entry.getValue()); + } + } + }, expirationMillis, expirationMillis, TimeUnit.MILLISECONDS); + } + + public void shutdown() { + cleaner.shutdown(); + } + + private static class TimedValue { + private final long lastUpdated; + private final V value; + + public TimedValue(V value) { + this.value = value; + this.lastUpdated = System.currentTimeMillis(); + } + } + }