Merge pull request #4757 from smatvienko-tb/cache-attribute-service-performance-improvement

Cache attribute service performance improvement
This commit is contained in:
Igor Kulikov 2021-06-18 17:44:31 +03:00 committed by GitHub
commit 26f0e48430
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 146 additions and 7 deletions

View File

@ -332,6 +332,7 @@ actors:
cache:
# caffeine or redis
type: "${CACHE_TYPE:caffeine}"
maximumPoolSize: "${CACHE_MAXIMUM_POOL_SIZE:16}" # max pool size to process futures that calls the external cache
attributes:
# make sure that if cache.type is 'redis' and cache.attributes.enabled is 'true' that you change 'maxmemory-policy' Redis config property to 'allkeys-lru', 'allkeys-lfu' or 'allkeys-random'
enabled: "${CACHE_ATTRIBUTES_ENABLED:true}"

View File

@ -21,7 +21,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.EntityType;
@ -32,6 +31,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.stats.DefaultCounter;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.cache.CacheExecutorService;
import org.thingsboard.server.dao.service.Validator;
import java.util.ArrayList;
@ -45,7 +45,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.CacheConstants.ATTRIBUTES_CACHE;
import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
@Service
@ -59,12 +58,15 @@ public class CachedAttributesService implements AttributesService {
private final AttributesCacheWrapper cacheWrapper;
private final DefaultCounter hitCounter;
private final DefaultCounter missCounter;
private final CacheExecutorService cacheExecutorService;
public CachedAttributesService(AttributesDao attributesDao,
AttributesCacheWrapper cacheWrapper,
StatsFactory statsFactory) {
StatsFactory statsFactory,
CacheExecutorService cacheExecutorService) {
this.attributesDao = attributesDao;
this.cacheWrapper = cacheWrapper;
this.cacheExecutorService = cacheExecutorService;
this.hitCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "hit");
this.missCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "miss");
@ -88,7 +90,7 @@ public class CachedAttributesService implements AttributesService {
// TODO: think if it's a good idea to store 'empty' attributes
cacheWrapper.put(attributeCacheKey, foundAttrKvEntry.orElse(null));
return foundAttrKvEntry;
}, MoreExecutors.directExecutor());
}, cacheExecutorService);
}
}
@ -111,7 +113,7 @@ public class CachedAttributesService implements AttributesService {
notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet());
ListenableFuture<List<AttributeKvEntry>> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys);
return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), MoreExecutors.directExecutor());
return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), cacheExecutorService);
}
@ -169,7 +171,7 @@ public class CachedAttributesService implements AttributesService {
// TODO: can do if (attributesCache.get() != null) attributesCache.put() instead, but will be more twice more requests to cache
List<String> attributeKeys = attributes.stream().map(KvEntry::getKey).collect(Collectors.toList());
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), MoreExecutors.directExecutor());
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService);
return future;
}
@ -177,7 +179,7 @@ public class CachedAttributesService implements AttributesService {
public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) {
validate(entityId, scope);
ListenableFuture<List<Void>> future = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys);
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), MoreExecutors.directExecutor());
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService);
return future;
}

View File

@ -0,0 +1,48 @@
/**
* ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL
*
* Copyright © 2016-2021 ThingsBoard, Inc. All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains
* the property of ThingsBoard, Inc. and its suppliers,
* if any. The intellectual and technical concepts contained
* herein are proprietary to ThingsBoard, Inc.
* and its suppliers and may be covered by U.S. and Foreign Patents,
* patents in process, and are protected by trade secret or copyright law.
*
* Dissemination of this information or reproduction of this material is strictly forbidden
* unless prior written permission is obtained from COMPANY.
*
* Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees,
* managers or contractors who have executed Confidentiality and Non-disclosure agreements
* explicitly covering such access.
*
* The copyright notice above does not evidence any actual or intended publication
* or disclosure of this source code, which includes
* information that is confidential and/or proprietary, and is a trade secret, of COMPANY.
* ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE,
* OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT
* THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED,
* AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES.
* THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION
* DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS,
* OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART.
*/
package org.thingsboard.server.dao.cache;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.AbstractListeningExecutor;
@Component
public class CacheExecutorService extends AbstractListeningExecutor {
@Value("${cache.maximumPoolSize}")
private int poolSize;
@Override
protected int getThreadPollSize() {
return poolSize;
}
}

View File

@ -0,0 +1,88 @@
/**
* ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL
*
* Copyright © 2016-2021 ThingsBoard, Inc. All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains
* the property of ThingsBoard, Inc. and its suppliers,
* if any. The intellectual and technical concepts contained
* herein are proprietary to ThingsBoard, Inc.
* and its suppliers and may be covered by U.S. and Foreign Patents,
* patents in process, and are protected by trade secret or copyright law.
*
* Dissemination of this information or reproduction of this material is strictly forbidden
* unless prior written permission is obtained from COMPANY.
*
* Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees,
* managers or contractors who have executed Confidentiality and Non-disclosure agreements
* explicitly covering such access.
*
* The copyright notice above does not evidence any actual or intended publication
* or disclosure of this source code, which includes
* information that is confidential and/or proprietary, and is a trade secret, of COMPANY.
* ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE,
* OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT
* THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED,
* AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES.
* THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION
* DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS,
* OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART.
*/
package org.thingsboard.server.dao.attributes;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Test;
import org.thingsboard.server.dao.cache.CacheConfiguration;
import org.thingsboard.server.dao.cache.CacheExecutorService;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.willCallRealMethod;
import static org.mockito.Mockito.mock;
public class CachedAttributesServiceTest {
public static final String REDIS = "redis";
@Test
public void givenLocalCacheTypeName_whenEquals_thenOK() {
assertThat(CachedAttributesService.LOCAL_CACHE_TYPE, is("caffeine"));
}
@Test
public void givenCacheType_whenGetExecutor_thenDirectExecutor() {
CachedAttributesService cachedAttributesService = mock(CachedAttributesService.class);
CacheExecutorService cacheExecutorService = mock(CacheExecutorService.class);
willCallRealMethod().given(cachedAttributesService).getExecutor(any(), any());
assertThat(cachedAttributesService.getExecutor(null, cacheExecutorService), is(MoreExecutors.directExecutor()));
CacheConfiguration cacheConfiguration = new CacheConfiguration();
cacheConfiguration.setType(null);
assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(MoreExecutors.directExecutor()));
cacheConfiguration.setType("");
assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(MoreExecutors.directExecutor()));
cacheConfiguration.setType(CachedAttributesService.LOCAL_CACHE_TYPE);
assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(MoreExecutors.directExecutor()));
}
@Test
public void givenCacheType_whenGetExecutor_thenReturnCacheExecutorService() {
CachedAttributesService cachedAttributesService = mock(CachedAttributesService.class);
CacheExecutorService cacheExecutorService = mock(CacheExecutorService.class);
willCallRealMethod().given(cachedAttributesService).getExecutor(any(CacheConfiguration.class), any(CacheExecutorService.class));
CacheConfiguration cacheConfiguration = new CacheConfiguration();
cacheConfiguration.setType(REDIS);
assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(cacheExecutorService));
cacheConfiguration.setType("unknownCacheType");
assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(cacheExecutorService));
}
}