cassandra buffered rate executor: separate beans for read and write operations
This commit is contained in:
		
							parent
							
								
									a852e11a3d
								
							
						
					
					
						commit
						7f1f298774
					
				@ -59,7 +59,8 @@ import org.thingsboard.server.dao.edge.EdgeEventService;
 | 
			
		||||
import org.thingsboard.server.dao.edge.EdgeService;
 | 
			
		||||
import org.thingsboard.server.dao.entityview.EntityViewService;
 | 
			
		||||
import org.thingsboard.server.dao.event.EventService;
 | 
			
		||||
import org.thingsboard.server.dao.nosql.CassandraBufferedRateExecutor;
 | 
			
		||||
import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor;
 | 
			
		||||
import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor;
 | 
			
		||||
import org.thingsboard.server.dao.ota.OtaPackageService;
 | 
			
		||||
import org.thingsboard.server.dao.relation.RelationService;
 | 
			
		||||
import org.thingsboard.server.dao.resource.ResourceService;
 | 
			
		||||
@ -85,7 +86,6 @@ import org.thingsboard.server.cluster.TbClusterService;
 | 
			
		||||
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
 | 
			
		||||
import org.thingsboard.server.service.rpc.TbRpcService;
 | 
			
		||||
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
 | 
			
		||||
import org.thingsboard.server.service.script.JsExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.script.JsInvokeService;
 | 
			
		||||
import org.thingsboard.server.service.session.DeviceSessionCacheService;
 | 
			
		||||
import org.thingsboard.server.service.sms.SmsExecutorService;
 | 
			
		||||
@ -421,7 +421,11 @@ public class ActorSystemContext {
 | 
			
		||||
 | 
			
		||||
    @Autowired(required = false)
 | 
			
		||||
    @Getter
 | 
			
		||||
    private CassandraBufferedRateExecutor cassandraBufferedRateExecutor;
 | 
			
		||||
    private CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor;
 | 
			
		||||
 | 
			
		||||
    @Autowired(required = false)
 | 
			
		||||
    @Getter
 | 
			
		||||
    private CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor;
 | 
			
		||||
 | 
			
		||||
    @Autowired(required = false)
 | 
			
		||||
    @Getter
 | 
			
		||||
 | 
			
		||||
@ -557,8 +557,13 @@ class DefaultTbContext implements TbContext {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbResultSetFuture submitCassandraTask(CassandraStatementTask task) {
 | 
			
		||||
        return mainCtx.getCassandraBufferedRateExecutor().submit(task);
 | 
			
		||||
    public TbResultSetFuture submitCassandraReadTask(CassandraStatementTask task) {
 | 
			
		||||
        return mainCtx.getCassandraBufferedRateReadExecutor().submit(task);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbResultSetFuture submitCassandraWriteTask(CassandraStatementTask task) {
 | 
			
		||||
        return mainCtx.getCassandraBufferedRateWriteExecutor().submit(task);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -26,6 +26,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.dao.cassandra.CassandraCluster;
 | 
			
		||||
import org.thingsboard.server.dao.cassandra.guava.GuavaSession;
 | 
			
		||||
import org.thingsboard.server.dao.util.BufferedRateExecutor;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
@ -40,7 +41,10 @@ public abstract class CassandraAbstractDao {
 | 
			
		||||
    private ConcurrentMap<String, PreparedStatement> preparedStatementMap = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private CassandraBufferedRateExecutor rateLimiter;
 | 
			
		||||
    private CassandraBufferedRateReadExecutor rateReadLimiter;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private CassandraBufferedRateWriteExecutor rateWriteLimiter;
 | 
			
		||||
 | 
			
		||||
    private GuavaSession session;
 | 
			
		||||
 | 
			
		||||
@ -61,36 +65,38 @@ public abstract class CassandraAbstractDao {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected AsyncResultSet executeRead(TenantId tenantId, Statement statement) {
 | 
			
		||||
        return execute(tenantId, statement, defaultReadLevel);
 | 
			
		||||
        return execute(tenantId, statement, defaultReadLevel, rateReadLimiter);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected AsyncResultSet executeWrite(TenantId tenantId, Statement statement) {
 | 
			
		||||
        return execute(tenantId, statement, defaultWriteLevel);
 | 
			
		||||
        return execute(tenantId, statement, defaultWriteLevel, rateWriteLimiter);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected TbResultSetFuture executeAsyncRead(TenantId tenantId, Statement statement) {
 | 
			
		||||
        return executeAsync(tenantId, statement, defaultReadLevel);
 | 
			
		||||
        return executeAsync(tenantId, statement, defaultReadLevel, rateReadLimiter);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected TbResultSetFuture executeAsyncWrite(TenantId tenantId, Statement statement) {
 | 
			
		||||
        return executeAsync(tenantId, statement, defaultWriteLevel);
 | 
			
		||||
        return executeAsync(tenantId, statement, defaultWriteLevel, rateWriteLimiter);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private AsyncResultSet execute(TenantId tenantId, Statement statement, ConsistencyLevel level) {
 | 
			
		||||
    private AsyncResultSet execute(TenantId tenantId, Statement statement, ConsistencyLevel level,
 | 
			
		||||
                                   BufferedRateExecutor<CassandraStatementTask, TbResultSetFuture> rateExecutor) {
 | 
			
		||||
        if (log.isDebugEnabled()) {
 | 
			
		||||
            log.debug("Execute cassandra statement {}", statementToString(statement));
 | 
			
		||||
        }
 | 
			
		||||
        return executeAsync(tenantId, statement, level).getUninterruptibly();
 | 
			
		||||
        return executeAsync(tenantId, statement, level, rateExecutor).getUninterruptibly();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TbResultSetFuture executeAsync(TenantId tenantId, Statement statement, ConsistencyLevel level) {
 | 
			
		||||
    private TbResultSetFuture executeAsync(TenantId tenantId, Statement statement, ConsistencyLevel level,
 | 
			
		||||
                                           BufferedRateExecutor<CassandraStatementTask, TbResultSetFuture> rateExecutor) {
 | 
			
		||||
        if (log.isDebugEnabled()) {
 | 
			
		||||
            log.debug("Execute cassandra async statement {}", statementToString(statement));
 | 
			
		||||
        }
 | 
			
		||||
        if (statement.getConsistencyLevel() == null) {
 | 
			
		||||
            statement.setConsistencyLevel(level);
 | 
			
		||||
        }
 | 
			
		||||
        return rateLimiter.submit(new CassandraStatementTask(tenantId, getSession(), statement));
 | 
			
		||||
        return rateExecutor.submit(new CassandraStatementTask(tenantId, getSession(), statement));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static String statementToString(Statement statement) {
 | 
			
		||||
 | 
			
		||||
@ -22,9 +22,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.stats.DefaultCounter;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsCounter;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsFactory;
 | 
			
		||||
import org.thingsboard.server.dao.entity.EntityService;
 | 
			
		||||
import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
 | 
			
		||||
@ -32,8 +29,6 @@ import org.thingsboard.server.dao.util.AsyncTaskContext;
 | 
			
		||||
import org.thingsboard.server.dao.util.NoSqlAnyDao;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PreDestroy;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Created by ashvayka on 24.10.18.
 | 
			
		||||
@ -41,15 +36,11 @@ import java.util.Map;
 | 
			
		||||
@Component
 | 
			
		||||
@Slf4j
 | 
			
		||||
@NoSqlAnyDao
 | 
			
		||||
public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, TbResultSetFuture, TbResultSet> {
 | 
			
		||||
public class CassandraBufferedRateReadExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, TbResultSetFuture, TbResultSet> {
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private EntityService entityService;
 | 
			
		||||
    private Map<TenantId, String> tenantNamesCache = new HashMap<>();
 | 
			
		||||
    static final String BUFFER_NAME = "Read";
 | 
			
		||||
 | 
			
		||||
    private boolean printTenantNames;
 | 
			
		||||
 | 
			
		||||
    public CassandraBufferedRateExecutor(
 | 
			
		||||
    public CassandraBufferedRateReadExecutor(
 | 
			
		||||
            @Value("${cassandra.query.buffer_size}") int queueLimit,
 | 
			
		||||
            @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit,
 | 
			
		||||
            @Value("${cassandra.query.permit_max_wait_time}") long maxWaitTime,
 | 
			
		||||
@ -60,57 +51,16 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<
 | 
			
		||||
            @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration,
 | 
			
		||||
            @Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames,
 | 
			
		||||
            @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq,
 | 
			
		||||
            @Autowired StatsFactory statsFactory) {
 | 
			
		||||
        super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq, statsFactory);
 | 
			
		||||
        this.printTenantNames = printTenantNames;
 | 
			
		||||
            @Autowired StatsFactory statsFactory,
 | 
			
		||||
            @Autowired EntityService entityService) {
 | 
			
		||||
        super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq, statsFactory,
 | 
			
		||||
                entityService, printTenantNames);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
 | 
			
		||||
    @Override
 | 
			
		||||
    public void printStats() {
 | 
			
		||||
        int queueSize = getQueueSize();
 | 
			
		||||
        int rateLimitedTenantsCount = (int) stats.getRateLimitedTenants().values().stream()
 | 
			
		||||
                .filter(defaultCounter -> defaultCounter.get() > 0)
 | 
			
		||||
                .count();
 | 
			
		||||
 | 
			
		||||
        if (queueSize > 0
 | 
			
		||||
                || rateLimitedTenantsCount > 0
 | 
			
		||||
                || concurrencyLevel.get() > 0
 | 
			
		||||
                || stats.getStatsCounters().stream().anyMatch(counter -> counter.get() > 0)
 | 
			
		||||
        ) {
 | 
			
		||||
            StringBuilder statsBuilder = new StringBuilder();
 | 
			
		||||
 | 
			
		||||
            statsBuilder.append("queueSize").append(" = [").append(queueSize).append("] ");
 | 
			
		||||
            stats.getStatsCounters().forEach(counter -> {
 | 
			
		||||
                statsBuilder.append(counter.getName()).append(" = [").append(counter.get()).append("] ");
 | 
			
		||||
            });
 | 
			
		||||
            statsBuilder.append("totalRateLimitedTenants").append(" = [").append(rateLimitedTenantsCount).append("] ");
 | 
			
		||||
            statsBuilder.append(CONCURRENCY_LEVEL).append(" = [").append(concurrencyLevel.get()).append("] ");
 | 
			
		||||
 | 
			
		||||
            stats.getStatsCounters().forEach(StatsCounter::clear);
 | 
			
		||||
            log.info("Permits {}", statsBuilder);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        stats.getRateLimitedTenants().entrySet().stream()
 | 
			
		||||
                .filter(entry -> entry.getValue().get() > 0)
 | 
			
		||||
                .forEach(entry -> {
 | 
			
		||||
                    TenantId tenantId = entry.getKey();
 | 
			
		||||
                    DefaultCounter counter = entry.getValue();
 | 
			
		||||
                    int rateLimitedRequests = counter.get();
 | 
			
		||||
                    counter.clear();
 | 
			
		||||
                    if (printTenantNames) {
 | 
			
		||||
                        String name = tenantNamesCache.computeIfAbsent(tenantId, tId -> {
 | 
			
		||||
                            try {
 | 
			
		||||
                                return entityService.fetchEntityNameAsync(TenantId.SYS_TENANT_ID, tenantId).get();
 | 
			
		||||
                            } catch (Exception e) {
 | 
			
		||||
                                log.error("[{}] Failed to get tenant name", tenantId, e);
 | 
			
		||||
                                return "N/A";
 | 
			
		||||
                            }
 | 
			
		||||
                        });
 | 
			
		||||
                        log.info("[{}][{}] Rate limited requests: {}", tenantId, name, rateLimitedRequests);
 | 
			
		||||
                    } else {
 | 
			
		||||
                        log.info("[{}] Rate limited requests: {}", tenantId, rateLimitedRequests);
 | 
			
		||||
                    }
 | 
			
		||||
                });
 | 
			
		||||
        super.printStats();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
@ -118,6 +68,11 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<
 | 
			
		||||
        super.stop();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String getBufferName() {
 | 
			
		||||
        return BUFFER_NAME;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected SettableFuture<TbResultSet> create() {
 | 
			
		||||
        return SettableFuture.create();
 | 
			
		||||
@ -133,7 +88,7 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<
 | 
			
		||||
        CassandraStatementTask task = taskCtx.getTask();
 | 
			
		||||
        return task.executeAsync(
 | 
			
		||||
                statement ->
 | 
			
		||||
                    this.submit(new CassandraStatementTask(task.getTenantId(), task.getSession(), statement))
 | 
			
		||||
                        this.submit(new CassandraStatementTask(task.getTenantId(), task.getSession(), statement))
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,95 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2021 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.nosql;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.SettableFuture;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsFactory;
 | 
			
		||||
import org.thingsboard.server.dao.entity.EntityService;
 | 
			
		||||
import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
 | 
			
		||||
import org.thingsboard.server.dao.util.AsyncTaskContext;
 | 
			
		||||
import org.thingsboard.server.dao.util.NoSqlAnyDao;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PreDestroy;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Created by ashvayka on 24.10.18.
 | 
			
		||||
 */
 | 
			
		||||
@Component
 | 
			
		||||
@Slf4j
 | 
			
		||||
@NoSqlAnyDao
 | 
			
		||||
public class CassandraBufferedRateWriteExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, TbResultSetFuture, TbResultSet> {
 | 
			
		||||
 | 
			
		||||
    static final String BUFFER_NAME = "Write";
 | 
			
		||||
 | 
			
		||||
    public CassandraBufferedRateWriteExecutor(
 | 
			
		||||
            @Value("${cassandra.query.buffer_size}") int queueLimit,
 | 
			
		||||
            @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit,
 | 
			
		||||
            @Value("${cassandra.query.permit_max_wait_time}") long maxWaitTime,
 | 
			
		||||
            @Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads,
 | 
			
		||||
            @Value("${cassandra.query.callback_threads:4}") int callbackThreads,
 | 
			
		||||
            @Value("${cassandra.query.poll_ms:50}") long pollMs,
 | 
			
		||||
            @Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled,
 | 
			
		||||
            @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration,
 | 
			
		||||
            @Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames,
 | 
			
		||||
            @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq,
 | 
			
		||||
            @Autowired StatsFactory statsFactory,
 | 
			
		||||
            @Autowired EntityService entityService) {
 | 
			
		||||
        super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq, statsFactory,
 | 
			
		||||
                entityService, printTenantNames);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
 | 
			
		||||
    @Override
 | 
			
		||||
    public void printStats() {
 | 
			
		||||
        super.printStats();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
    public void stop() {
 | 
			
		||||
        super.stop();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String getBufferName() {
 | 
			
		||||
        return BUFFER_NAME;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected SettableFuture<TbResultSet> create() {
 | 
			
		||||
        return SettableFuture.create();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected TbResultSetFuture wrap(CassandraStatementTask task, SettableFuture<TbResultSet> future) {
 | 
			
		||||
        return new TbResultSetFuture(future);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected ListenableFuture<TbResultSet> execute(AsyncTaskContext<CassandraStatementTask, TbResultSet> taskCtx) {
 | 
			
		||||
        CassandraStatementTask task = taskCtx.getTask();
 | 
			
		||||
        return task.executeAsync(
 | 
			
		||||
                statement ->
 | 
			
		||||
                        this.submit(new CassandraStatementTask(task.getTenantId(), task.getSession(), statement))
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -31,12 +31,17 @@ import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardExecutors;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.msg.tools.TbRateLimits;
 | 
			
		||||
import org.thingsboard.server.common.stats.DefaultCounter;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsCounter;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsFactory;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsType;
 | 
			
		||||
import org.thingsboard.server.common.msg.tools.TbRateLimits;
 | 
			
		||||
import org.thingsboard.server.dao.entity.EntityService;
 | 
			
		||||
import org.thingsboard.server.dao.nosql.CassandraStatementTask;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.Nullable;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.BlockingQueue;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
@ -75,22 +80,32 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
 | 
			
		||||
    protected final AtomicInteger concurrencyLevel;
 | 
			
		||||
    protected final BufferedRateExecutorStats stats;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    private final EntityService entityService;
 | 
			
		||||
    private final Map<TenantId, String> tenantNamesCache = new HashMap<>();
 | 
			
		||||
 | 
			
		||||
    private final boolean printTenantNames;
 | 
			
		||||
 | 
			
		||||
    public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, int callbackThreads, long pollMs,
 | 
			
		||||
                                        boolean perTenantLimitsEnabled, String perTenantLimitsConfiguration, int printQueriesFreq, StatsFactory statsFactory) {
 | 
			
		||||
                                        boolean perTenantLimitsEnabled, String perTenantLimitsConfiguration, int printQueriesFreq, StatsFactory statsFactory,
 | 
			
		||||
                                        EntityService entityService, boolean printTenantNames) {
 | 
			
		||||
        this.maxWaitTime = maxWaitTime;
 | 
			
		||||
        this.pollMs = pollMs;
 | 
			
		||||
        this.concurrencyLimit = concurrencyLimit;
 | 
			
		||||
        this.printQueriesFreq = printQueriesFreq;
 | 
			
		||||
        this.queue = new LinkedBlockingDeque<>(queueLimit);
 | 
			
		||||
        this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads, ThingsBoardThreadFactory.forName("nosql-dispatcher"));
 | 
			
		||||
        this.callbackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreads, getClass());
 | 
			
		||||
        this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nosql-timeout"));
 | 
			
		||||
        this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads, ThingsBoardThreadFactory.forName("nosql-" + getBufferName() + "-dispatcher"));
 | 
			
		||||
        this.callbackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreads, "nosql-" + getBufferName() + "-callback");
 | 
			
		||||
        this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nosql-" + getBufferName() + "-timeout"));
 | 
			
		||||
        this.perTenantLimitsEnabled = perTenantLimitsEnabled;
 | 
			
		||||
        this.perTenantLimitsConfiguration = perTenantLimitsConfiguration;
 | 
			
		||||
        this.stats = new BufferedRateExecutorStats(statsFactory);
 | 
			
		||||
        String concurrencyLevelKey = StatsType.RATE_EXECUTOR.getName() + "." + CONCURRENCY_LEVEL;
 | 
			
		||||
        String concurrencyLevelKey = StatsType.RATE_EXECUTOR.getName() + "." + CONCURRENCY_LEVEL + getBufferName(); //metric name may change with buffer name suffix
 | 
			
		||||
        this.concurrencyLevel = statsFactory.createGauge(concurrencyLevelKey, new AtomicInteger(0));
 | 
			
		||||
 | 
			
		||||
        this.entityService = entityService;
 | 
			
		||||
        this.printTenantNames = printTenantNames;
 | 
			
		||||
 | 
			
		||||
        for (int i = 0; i < dispatcherThreads; i++) {
 | 
			
		||||
            dispatcherExecutor.submit(this::dispatch);
 | 
			
		||||
        }
 | 
			
		||||
@ -144,6 +159,8 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
 | 
			
		||||
 | 
			
		||||
    protected abstract ListenableFuture<V> execute(AsyncTaskContext<T, V> taskCtx);
 | 
			
		||||
 | 
			
		||||
    public abstract String getBufferName();
 | 
			
		||||
 | 
			
		||||
    private void dispatch() {
 | 
			
		||||
        log.info("Buffered rate executor thread started");
 | 
			
		||||
        while (!Thread.interrupted()) {
 | 
			
		||||
@ -264,4 +281,51 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
 | 
			
		||||
    protected int getQueueSize() {
 | 
			
		||||
        return queue.size();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void printStats() {
 | 
			
		||||
        int queueSize = getQueueSize();
 | 
			
		||||
        int rateLimitedTenantsCount = (int) stats.getRateLimitedTenants().values().stream()
 | 
			
		||||
                .filter(defaultCounter -> defaultCounter.get() > 0)
 | 
			
		||||
                .count();
 | 
			
		||||
 | 
			
		||||
        if (queueSize > 0
 | 
			
		||||
                || rateLimitedTenantsCount > 0
 | 
			
		||||
                || concurrencyLevel.get() > 0
 | 
			
		||||
                || stats.getStatsCounters().stream().anyMatch(counter -> counter.get() > 0)
 | 
			
		||||
        ) {
 | 
			
		||||
            StringBuilder statsBuilder = new StringBuilder();
 | 
			
		||||
 | 
			
		||||
            statsBuilder.append("queueSize").append(" = [").append(queueSize).append("] ");
 | 
			
		||||
            stats.getStatsCounters().forEach(counter -> {
 | 
			
		||||
                statsBuilder.append(counter.getName()).append(" = [").append(counter.get()).append("] ");
 | 
			
		||||
            });
 | 
			
		||||
            statsBuilder.append("totalRateLimitedTenants").append(" = [").append(rateLimitedTenantsCount).append("] ");
 | 
			
		||||
            statsBuilder.append(CONCURRENCY_LEVEL).append(" = [").append(concurrencyLevel.get()).append("] ");
 | 
			
		||||
 | 
			
		||||
            stats.getStatsCounters().forEach(StatsCounter::clear);
 | 
			
		||||
            log.info("Permits {}", statsBuilder);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        stats.getRateLimitedTenants().entrySet().stream()
 | 
			
		||||
                .filter(entry -> entry.getValue().get() > 0)
 | 
			
		||||
                .forEach(entry -> {
 | 
			
		||||
                    TenantId tenantId = entry.getKey();
 | 
			
		||||
                    DefaultCounter counter = entry.getValue();
 | 
			
		||||
                    int rateLimitedRequests = counter.get();
 | 
			
		||||
                    counter.clear();
 | 
			
		||||
                    if (printTenantNames) {
 | 
			
		||||
                        String name = tenantNamesCache.computeIfAbsent(tenantId, tId -> {
 | 
			
		||||
                            try {
 | 
			
		||||
                                return entityService.fetchEntityNameAsync(TenantId.SYS_TENANT_ID, tenantId).get();
 | 
			
		||||
                            } catch (Exception e) {
 | 
			
		||||
                                log.error("[{}] Failed to get tenant name", tenantId, e);
 | 
			
		||||
                                return "N/A";
 | 
			
		||||
                            }
 | 
			
		||||
                        });
 | 
			
		||||
                        log.info("[{}][{}] Rate limited requests: {}", tenantId, name, rateLimitedRequests);
 | 
			
		||||
                    } else {
 | 
			
		||||
                        log.info("[{}] Rate limited requests: {}", tenantId, rateLimitedRequests);
 | 
			
		||||
                    }
 | 
			
		||||
                });
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -20,6 +20,7 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement;
 | 
			
		||||
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
 | 
			
		||||
import com.datastax.oss.driver.api.core.cql.Statement;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.SettableFuture;
 | 
			
		||||
import org.junit.Before;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.junit.runner.RunWith;
 | 
			
		||||
@ -38,6 +39,7 @@ import java.util.UUID;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.anyInt;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.anyString;
 | 
			
		||||
import static org.mockito.BDDMockito.willReturn;
 | 
			
		||||
import static org.mockito.Mockito.doReturn;
 | 
			
		||||
import static org.mockito.Mockito.times;
 | 
			
		||||
import static org.mockito.Mockito.verify;
 | 
			
		||||
@ -58,9 +60,6 @@ public class CassandraPartitionsCacheTest {
 | 
			
		||||
    @Mock
 | 
			
		||||
    private Environment environment;
 | 
			
		||||
 | 
			
		||||
    @Mock
 | 
			
		||||
    private CassandraBufferedRateExecutor rateLimiter;
 | 
			
		||||
 | 
			
		||||
    @Mock
 | 
			
		||||
    private CassandraCluster cluster;
 | 
			
		||||
 | 
			
		||||
@ -74,7 +73,6 @@ public class CassandraPartitionsCacheTest {
 | 
			
		||||
        ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "systemTtl", 0);
 | 
			
		||||
        ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "setNullValuesEnabled", false);
 | 
			
		||||
        ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "environment", environment);
 | 
			
		||||
        ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "rateLimiter", rateLimiter);
 | 
			
		||||
        ReflectionTestUtils.setField(cassandraBaseTimeseriesDao, "cluster", cluster);
 | 
			
		||||
 | 
			
		||||
        when(cluster.getDefaultReadConsistencyLevel()).thenReturn(ConsistencyLevel.ONE);
 | 
			
		||||
@ -88,7 +86,9 @@ public class CassandraPartitionsCacheTest {
 | 
			
		||||
        when(boundStatement.setUuid(anyInt(), any(UUID.class))).thenReturn(boundStatement);
 | 
			
		||||
        when(boundStatement.setLong(anyInt(), any(Long.class))).thenReturn(boundStatement);
 | 
			
		||||
 | 
			
		||||
        doReturn(Futures.immediateFuture(0)).when(cassandraBaseTimeseriesDao).getFuture(any(TbResultSetFuture.class), any());
 | 
			
		||||
        willReturn(new TbResultSetFuture(SettableFuture.create())).given(cassandraBaseTimeseriesDao).executeAsyncWrite(any(), any());
 | 
			
		||||
 | 
			
		||||
        doReturn(Futures.immediateFuture(0)).when(cassandraBaseTimeseriesDao).getFuture(any(), any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -107,4 +107,5 @@ public class CassandraPartitionsCacheTest {
 | 
			
		||||
        }
 | 
			
		||||
        verify(cassandraBaseTimeseriesDao, times(60000)).executeAsyncWrite(any(TenantId.class), any(Statement.class));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -245,7 +245,9 @@ public interface TbContext {
 | 
			
		||||
 | 
			
		||||
    CassandraCluster getCassandraCluster();
 | 
			
		||||
 | 
			
		||||
    TbResultSetFuture submitCassandraTask(CassandraStatementTask task);
 | 
			
		||||
    TbResultSetFuture submitCassandraReadTask(CassandraStatementTask task);
 | 
			
		||||
 | 
			
		||||
    TbResultSetFuture submitCassandraWriteTask(CassandraStatementTask task);
 | 
			
		||||
 | 
			
		||||
    PageData<RuleNodeState> findRuleNodeStates(PageLink pageLink);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -213,7 +213,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
 | 
			
		||||
        if (statement.getConsistencyLevel() == null) {
 | 
			
		||||
            statement.setConsistencyLevel(level);
 | 
			
		||||
        }
 | 
			
		||||
        return ctx.submitCassandraTask(new CassandraStatementTask(ctx.getTenantId(), getSession(), statement));
 | 
			
		||||
        return ctx.submitCassandraWriteTask(new CassandraStatementTask(ctx.getTenantId(), getSession(), statement));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static String statementToString(Statement statement) {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user