diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index fcde169fc9..69258a644f 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -332,6 +332,7 @@ actors: cache: # caffeine or redis type: "${CACHE_TYPE:caffeine}" + maximumPoolSize: "${CACHE_MAXIMUM_POOL_SIZE:16}" # max pool size to process futures that calls the external cache attributes: # make sure that if cache.type is 'redis' and cache.attributes.enabled is 'true' that you change 'maxmemory-policy' Redis config property to 'allkeys-lru', 'allkeys-lfu' or 'allkeys-random' enabled: "${CACHE_ATTRIBUTES_ENABLED:true}" diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index 87ddfeee9a..d4e29f02de 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -21,7 +21,6 @@ import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cache.Cache; -import org.springframework.cache.CacheManager; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.EntityType; @@ -32,6 +31,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.stats.DefaultCounter; import org.thingsboard.server.common.stats.StatsFactory; +import org.thingsboard.server.dao.cache.CacheExecutorService; import org.thingsboard.server.dao.service.Validator; import java.util.ArrayList; @@ -45,7 +45,6 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import static org.thingsboard.server.common.data.CacheConstants.ATTRIBUTES_CACHE; import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; @Service @@ -59,12 +58,15 @@ public class CachedAttributesService implements AttributesService { private final AttributesCacheWrapper cacheWrapper; private final DefaultCounter hitCounter; private final DefaultCounter missCounter; + private final CacheExecutorService cacheExecutorService; public CachedAttributesService(AttributesDao attributesDao, AttributesCacheWrapper cacheWrapper, - StatsFactory statsFactory) { + StatsFactory statsFactory, + CacheExecutorService cacheExecutorService) { this.attributesDao = attributesDao; this.cacheWrapper = cacheWrapper; + this.cacheExecutorService = cacheExecutorService; this.hitCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "hit"); this.missCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "miss"); @@ -88,7 +90,7 @@ public class CachedAttributesService implements AttributesService { // TODO: think if it's a good idea to store 'empty' attributes cacheWrapper.put(attributeCacheKey, foundAttrKvEntry.orElse(null)); return foundAttrKvEntry; - }, MoreExecutors.directExecutor()); + }, cacheExecutorService); } } @@ -111,7 +113,7 @@ public class CachedAttributesService implements AttributesService { notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet()); ListenableFuture> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys); - return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), MoreExecutors.directExecutor()); + return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), cacheExecutorService); } @@ -169,7 +171,7 @@ public class CachedAttributesService implements AttributesService { // TODO: can do if (attributesCache.get() != null) attributesCache.put() instead, but will be more twice more requests to cache List attributeKeys = attributes.stream().map(KvEntry::getKey).collect(Collectors.toList()); - future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), MoreExecutors.directExecutor()); + future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService); return future; } @@ -177,7 +179,7 @@ public class CachedAttributesService implements AttributesService { public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { validate(entityId, scope); ListenableFuture> future = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys); - future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), MoreExecutors.directExecutor()); + future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService); return future; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/cache/CacheExecutorService.java b/dao/src/main/java/org/thingsboard/server/dao/cache/CacheExecutorService.java new file mode 100644 index 0000000000..ca00e41d79 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/cache/CacheExecutorService.java @@ -0,0 +1,48 @@ +/** + * ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL + * + * Copyright © 2016-2021 ThingsBoard, Inc. All Rights Reserved. + * + * NOTICE: All information contained herein is, and remains + * the property of ThingsBoard, Inc. and its suppliers, + * if any. The intellectual and technical concepts contained + * herein are proprietary to ThingsBoard, Inc. + * and its suppliers and may be covered by U.S. and Foreign Patents, + * patents in process, and are protected by trade secret or copyright law. + * + * Dissemination of this information or reproduction of this material is strictly forbidden + * unless prior written permission is obtained from COMPANY. + * + * Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees, + * managers or contractors who have executed Confidentiality and Non-disclosure agreements + * explicitly covering such access. + * + * The copyright notice above does not evidence any actual or intended publication + * or disclosure of this source code, which includes + * information that is confidential and/or proprietary, and is a trade secret, of COMPANY. + * ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, + * OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT + * THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, + * AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES. + * THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION + * DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, + * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. + */ +package org.thingsboard.server.dao.cache; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.AbstractListeningExecutor; + +@Component +public class CacheExecutorService extends AbstractListeningExecutor { + + @Value("${cache.maximumPoolSize}") + private int poolSize; + + @Override + protected int getThreadPollSize() { + return poolSize; + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/attributes/CachedAttributesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/attributes/CachedAttributesServiceTest.java new file mode 100644 index 0000000000..af980a6301 --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/attributes/CachedAttributesServiceTest.java @@ -0,0 +1,88 @@ +/** + * ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL + * + * Copyright © 2016-2021 ThingsBoard, Inc. All Rights Reserved. + * + * NOTICE: All information contained herein is, and remains + * the property of ThingsBoard, Inc. and its suppliers, + * if any. The intellectual and technical concepts contained + * herein are proprietary to ThingsBoard, Inc. + * and its suppliers and may be covered by U.S. and Foreign Patents, + * patents in process, and are protected by trade secret or copyright law. + * + * Dissemination of this information or reproduction of this material is strictly forbidden + * unless prior written permission is obtained from COMPANY. + * + * Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees, + * managers or contractors who have executed Confidentiality and Non-disclosure agreements + * explicitly covering such access. + * + * The copyright notice above does not evidence any actual or intended publication + * or disclosure of this source code, which includes + * information that is confidential and/or proprietary, and is a trade secret, of COMPANY. + * ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, + * OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT + * THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, + * AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES. + * THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION + * DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, + * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. + */ +package org.thingsboard.server.dao.attributes; + +import com.google.common.util.concurrent.MoreExecutors; +import org.junit.Test; +import org.thingsboard.server.dao.cache.CacheConfiguration; +import org.thingsboard.server.dao.cache.CacheExecutorService; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.willCallRealMethod; +import static org.mockito.Mockito.mock; + +public class CachedAttributesServiceTest { + + public static final String REDIS = "redis"; + + @Test + public void givenLocalCacheTypeName_whenEquals_thenOK() { + assertThat(CachedAttributesService.LOCAL_CACHE_TYPE, is("caffeine")); + } + + @Test + public void givenCacheType_whenGetExecutor_thenDirectExecutor() { + CachedAttributesService cachedAttributesService = mock(CachedAttributesService.class); + CacheExecutorService cacheExecutorService = mock(CacheExecutorService.class); + willCallRealMethod().given(cachedAttributesService).getExecutor(any(), any()); + + assertThat(cachedAttributesService.getExecutor(null, cacheExecutorService), is(MoreExecutors.directExecutor())); + + CacheConfiguration cacheConfiguration = new CacheConfiguration(); + cacheConfiguration.setType(null); + assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(MoreExecutors.directExecutor())); + + cacheConfiguration.setType(""); + assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(MoreExecutors.directExecutor())); + + cacheConfiguration.setType(CachedAttributesService.LOCAL_CACHE_TYPE); + assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(MoreExecutors.directExecutor())); + + } + + @Test + public void givenCacheType_whenGetExecutor_thenReturnCacheExecutorService() { + CachedAttributesService cachedAttributesService = mock(CachedAttributesService.class); + CacheExecutorService cacheExecutorService = mock(CacheExecutorService.class); + willCallRealMethod().given(cachedAttributesService).getExecutor(any(CacheConfiguration.class), any(CacheExecutorService.class)); + + CacheConfiguration cacheConfiguration = new CacheConfiguration(); + cacheConfiguration.setType(REDIS); + assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(cacheExecutorService)); + + cacheConfiguration.setType("unknownCacheType"); + assertThat(cachedAttributesService.getExecutor(cacheConfiguration, cacheExecutorService), is(cacheExecutorService)); + + } + +} \ No newline at end of file