diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index 87ddfeee9a..92c75760e2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -17,11 +17,9 @@ package org.thingsboard.server.dao.attributes; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cache.Cache; -import org.springframework.cache.CacheManager; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.EntityType; @@ -32,6 +30,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.stats.DefaultCounter; import org.thingsboard.server.common.stats.StatsFactory; +import org.thingsboard.server.dao.cache.CacheExecutorService; import org.thingsboard.server.dao.service.Validator; import java.util.ArrayList; @@ -45,7 +44,6 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import static org.thingsboard.server.common.data.CacheConstants.ATTRIBUTES_CACHE; import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; @Service @@ -59,12 +57,15 @@ public class CachedAttributesService implements AttributesService { private final AttributesCacheWrapper cacheWrapper; private final DefaultCounter hitCounter; private final DefaultCounter missCounter; + private final CacheExecutorService cacheExecutorService; public CachedAttributesService(AttributesDao attributesDao, AttributesCacheWrapper cacheWrapper, - StatsFactory statsFactory) { + StatsFactory statsFactory, + CacheExecutorService cacheExecutorService) { this.attributesDao = attributesDao; this.cacheWrapper = cacheWrapper; + this.cacheExecutorService = cacheExecutorService; this.hitCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "hit"); this.missCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "miss"); @@ -88,7 +89,7 @@ public class CachedAttributesService implements AttributesService { // TODO: think if it's a good idea to store 'empty' attributes cacheWrapper.put(attributeCacheKey, foundAttrKvEntry.orElse(null)); return foundAttrKvEntry; - }, MoreExecutors.directExecutor()); + }, cacheExecutorService); } } @@ -111,7 +112,7 @@ public class CachedAttributesService implements AttributesService { notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet()); ListenableFuture> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys); - return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), MoreExecutors.directExecutor()); + return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), cacheExecutorService); } @@ -169,7 +170,7 @@ public class CachedAttributesService implements AttributesService { // TODO: can do if (attributesCache.get() != null) attributesCache.put() instead, but will be more twice more requests to cache List attributeKeys = attributes.stream().map(KvEntry::getKey).collect(Collectors.toList()); - future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), MoreExecutors.directExecutor()); + future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService); return future; } @@ -177,7 +178,7 @@ public class CachedAttributesService implements AttributesService { public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { validate(entityId, scope); ListenableFuture> future = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys); - future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), MoreExecutors.directExecutor()); + future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService); return future; }