cache: CachedAttributesService replaced directExecutor with CacheExecutorService

This commit is contained in:
Sergey Matvienko 2021-06-11 12:36:51 +03:00
parent e4dd517edd
commit c843b2f09e

View File

@ -17,11 +17,9 @@ package org.thingsboard.server.dao.attributes;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.EntityType;
@ -32,6 +30,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.stats.DefaultCounter;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.cache.CacheExecutorService;
import org.thingsboard.server.dao.service.Validator;
import java.util.ArrayList;
@ -45,7 +44,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.CacheConstants.ATTRIBUTES_CACHE;
import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
@Service
@ -59,12 +57,15 @@ public class CachedAttributesService implements AttributesService {
private final AttributesCacheWrapper cacheWrapper;
private final DefaultCounter hitCounter;
private final DefaultCounter missCounter;
private final CacheExecutorService cacheExecutorService;
public CachedAttributesService(AttributesDao attributesDao,
AttributesCacheWrapper cacheWrapper,
StatsFactory statsFactory) {
StatsFactory statsFactory,
CacheExecutorService cacheExecutorService) {
this.attributesDao = attributesDao;
this.cacheWrapper = cacheWrapper;
this.cacheExecutorService = cacheExecutorService;
this.hitCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "hit");
this.missCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "miss");
@ -88,7 +89,7 @@ public class CachedAttributesService implements AttributesService {
// TODO: think if it's a good idea to store 'empty' attributes
cacheWrapper.put(attributeCacheKey, foundAttrKvEntry.orElse(null));
return foundAttrKvEntry;
}, MoreExecutors.directExecutor());
}, cacheExecutorService);
}
}
@ -111,7 +112,7 @@ public class CachedAttributesService implements AttributesService {
notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet());
ListenableFuture<List<AttributeKvEntry>> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys);
return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), MoreExecutors.directExecutor());
return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), cacheExecutorService);
}
@ -169,7 +170,7 @@ public class CachedAttributesService implements AttributesService {
// TODO: can do if (attributesCache.get() != null) attributesCache.put() instead, but will be more twice more requests to cache
List<String> attributeKeys = attributes.stream().map(KvEntry::getKey).collect(Collectors.toList());
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), MoreExecutors.directExecutor());
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService);
return future;
}
@ -177,7 +178,7 @@ public class CachedAttributesService implements AttributesService {
public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) {
validate(entityId, scope);
ListenableFuture<List<Void>> future = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys);
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), MoreExecutors.directExecutor());
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService);
return future;
}