Merge pull request #4832 from smatvienko-tb/cache-attribute-service-performance-improvement
Cache attribute service performance improvement (fix for local cache)
This commit is contained in:
		
						commit
						f58fe1647b
					
				@ -19,10 +19,12 @@ import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
			
		||||
import org.springframework.cache.Cache;
 | 
			
		||||
import org.springframework.context.annotation.Primary;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.springframework.util.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
@ -34,6 +36,7 @@ import org.thingsboard.server.common.stats.StatsFactory;
 | 
			
		||||
import org.thingsboard.server.dao.cache.CacheExecutorService;
 | 
			
		||||
import org.thingsboard.server.dao.service.Validator;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PostConstruct;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collection;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
@ -43,6 +46,7 @@ import java.util.Map;
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.Executor;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
 | 
			
		||||
@ -53,12 +57,17 @@ import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class CachedAttributesService implements AttributesService {
 | 
			
		||||
    private static final String STATS_NAME = "attributes.cache";
 | 
			
		||||
    public static final String LOCAL_CACHE_TYPE = "caffeine";
 | 
			
		||||
 | 
			
		||||
    private final AttributesDao attributesDao;
 | 
			
		||||
    private final AttributesCacheWrapper cacheWrapper;
 | 
			
		||||
    private final CacheExecutorService cacheExecutorService;
 | 
			
		||||
    private final DefaultCounter hitCounter;
 | 
			
		||||
    private final DefaultCounter missCounter;
 | 
			
		||||
    private final CacheExecutorService cacheExecutorService;
 | 
			
		||||
    private Executor cacheExecutor;
 | 
			
		||||
 | 
			
		||||
    @Value("${cache.type}")
 | 
			
		||||
    private String cacheType;
 | 
			
		||||
 | 
			
		||||
    public CachedAttributesService(AttributesDao attributesDao,
 | 
			
		||||
                                   AttributesCacheWrapper cacheWrapper,
 | 
			
		||||
@ -72,6 +81,25 @@ public class CachedAttributesService implements AttributesService {
 | 
			
		||||
        this.missCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "miss");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    public void init() {
 | 
			
		||||
        this.cacheExecutor = getExecutor(cacheType, cacheExecutorService);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Will return:
 | 
			
		||||
     *   - for the <b>local</b> cache type (cache.type="coffeine"): directExecutor (run callback immediately in the same thread)
 | 
			
		||||
     *   - for the <b>remote</b> cache: dedicated thread pool for the cache IO calls to unblock any caller thread
 | 
			
		||||
     * */
 | 
			
		||||
    Executor getExecutor(String cacheType, CacheExecutorService cacheExecutorService) {
 | 
			
		||||
        if (StringUtils.isEmpty(cacheType) || LOCAL_CACHE_TYPE.equals(cacheType)) {
 | 
			
		||||
            log.info("Going to use directExecutor for the local cache type {}", cacheType);
 | 
			
		||||
            return MoreExecutors.directExecutor();
 | 
			
		||||
        }
 | 
			
		||||
        log.info("Going to use cacheExecutorService for the remote cache type {}", cacheType);
 | 
			
		||||
        return cacheExecutorService;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Optional<AttributeKvEntry>> find(TenantId tenantId, EntityId entityId, String scope, String attributeKey) {
 | 
			
		||||
        validate(entityId, scope);
 | 
			
		||||
@ -90,7 +118,7 @@ public class CachedAttributesService implements AttributesService {
 | 
			
		||||
                // TODO: think if it's a good idea to store 'empty' attributes
 | 
			
		||||
                cacheWrapper.put(attributeCacheKey, foundAttrKvEntry.orElse(null));
 | 
			
		||||
                return foundAttrKvEntry;
 | 
			
		||||
            }, cacheExecutorService);
 | 
			
		||||
            }, cacheExecutor);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -113,7 +141,7 @@ public class CachedAttributesService implements AttributesService {
 | 
			
		||||
        notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet());
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<List<AttributeKvEntry>> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys);
 | 
			
		||||
        return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), cacheExecutorService);
 | 
			
		||||
        return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), cacheExecutor);
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -171,7 +199,7 @@ public class CachedAttributesService implements AttributesService {
 | 
			
		||||
 | 
			
		||||
        // TODO: can do if (attributesCache.get() != null) attributesCache.put() instead, but will be more twice more requests to cache
 | 
			
		||||
        List<String> attributeKeys = attributes.stream().map(KvEntry::getKey).collect(Collectors.toList());
 | 
			
		||||
        future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService);
 | 
			
		||||
        future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutor);
 | 
			
		||||
        return future;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -179,7 +207,7 @@ public class CachedAttributesService implements AttributesService {
 | 
			
		||||
    public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) {
 | 
			
		||||
        validate(entityId, scope);
 | 
			
		||||
        ListenableFuture<List<Void>> future = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys);
 | 
			
		||||
        future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutorService);
 | 
			
		||||
        future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutor);
 | 
			
		||||
        return future;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,63 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2021 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.attributes;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.thingsboard.server.dao.cache.CacheExecutorService;
 | 
			
		||||
 | 
			
		||||
import static org.hamcrest.CoreMatchers.is;
 | 
			
		||||
import static org.hamcrest.MatcherAssert.assertThat;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.BDDMockito.willCallRealMethod;
 | 
			
		||||
import static org.mockito.Mockito.mock;
 | 
			
		||||
 | 
			
		||||
public class CachedAttributesServiceTest {
 | 
			
		||||
 | 
			
		||||
    public static final String REDIS = "redis";
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenLocalCacheTypeName_whenEquals_thenOK() {
 | 
			
		||||
        assertThat(CachedAttributesService.LOCAL_CACHE_TYPE, is("caffeine"));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenCacheType_whenGetExecutor_thenDirectExecutor() {
 | 
			
		||||
        CachedAttributesService cachedAttributesService = mock(CachedAttributesService.class);
 | 
			
		||||
        CacheExecutorService cacheExecutorService = mock(CacheExecutorService.class);
 | 
			
		||||
        willCallRealMethod().given(cachedAttributesService).getExecutor(any(), any());
 | 
			
		||||
 | 
			
		||||
        assertThat(cachedAttributesService.getExecutor(null, cacheExecutorService), is(MoreExecutors.directExecutor()));
 | 
			
		||||
 | 
			
		||||
        assertThat(cachedAttributesService.getExecutor("", cacheExecutorService), is(MoreExecutors.directExecutor()));
 | 
			
		||||
 | 
			
		||||
        assertThat(cachedAttributesService.getExecutor(CachedAttributesService.LOCAL_CACHE_TYPE, cacheExecutorService), is(MoreExecutors.directExecutor()));
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenCacheType_whenGetExecutor_thenReturnCacheExecutorService() {
 | 
			
		||||
        CachedAttributesService cachedAttributesService = mock(CachedAttributesService.class);
 | 
			
		||||
        CacheExecutorService cacheExecutorService = mock(CacheExecutorService.class);
 | 
			
		||||
        willCallRealMethod().given(cachedAttributesService).getExecutor(any(String.class), any(CacheExecutorService.class));
 | 
			
		||||
 | 
			
		||||
        assertThat(cachedAttributesService.getExecutor(REDIS, cacheExecutorService), is(cacheExecutorService));
 | 
			
		||||
 | 
			
		||||
        assertThat(cachedAttributesService.getExecutor("unknownCacheType", cacheExecutorService), is(cacheExecutorService));
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user