Print Cassandra Queries with specified frequency

This commit is contained in:
Andrew Shvayka 2019-11-26 16:48:08 +02:00
parent e97ead2ce5
commit d40c054ca3
3 changed files with 34 additions and 14 deletions

View File

@ -183,6 +183,8 @@ cassandra:
rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}" rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}"
# set all data types values except target to null for the same ts on save # set all data types values except target to null for the same ts on save
set_null_values_enabled: "${CASSANDRA_QUERY_SET_NULL_VALUES_ENABLED:false}" set_null_values_enabled: "${CASSANDRA_QUERY_SET_NULL_VALUES_ENABLED:false}"
# log one of cassandra queries with specified frequency (0 - logging is disabled)
print_queries_freq: "${CASSANDRA_QUERY_PRINT_FREQ:0}"
tenant_rate_limits: tenant_rate_limits:
enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}" enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}"
configuration: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_CONFIGURATION:1000:1,30000:60}" configuration: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_CONFIGURATION:1000:1,30000:60}"

View File

@ -56,8 +56,9 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<
@Value("${cassandra.query.poll_ms:50}") long pollMs, @Value("${cassandra.query.poll_ms:50}") long pollMs,
@Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled, @Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled,
@Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration, @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration,
@Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames) { @Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames,
super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration); @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq) {
super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq);
this.printTenantNames = printTenantNames; this.printTenantNames = printTenantNames;
} }

View File

@ -44,6 +44,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
private final ExecutorService callbackExecutor; private final ExecutorService callbackExecutor;
private final ScheduledExecutorService timeoutExecutor; private final ScheduledExecutorService timeoutExecutor;
private final int concurrencyLimit; private final int concurrencyLimit;
private final int printQueriesFreq;
private final boolean perTenantLimitsEnabled; private final boolean perTenantLimitsEnabled;
private final String perTenantLimitsConfiguration; private final String perTenantLimitsConfiguration;
private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>(); private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
@ -57,12 +58,14 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
protected final AtomicInteger totalExpired = new AtomicInteger(); protected final AtomicInteger totalExpired = new AtomicInteger();
protected final AtomicInteger totalRejected = new AtomicInteger(); protected final AtomicInteger totalRejected = new AtomicInteger();
protected final AtomicInteger totalRateLimited = new AtomicInteger(); protected final AtomicInteger totalRateLimited = new AtomicInteger();
protected final AtomicInteger printQueriesIdx = new AtomicInteger();
public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, int callbackThreads, long pollMs, public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, int callbackThreads, long pollMs,
boolean perTenantLimitsEnabled, String perTenantLimitsConfiguration) { boolean perTenantLimitsEnabled, String perTenantLimitsConfiguration, int printQueriesFreq) {
this.maxWaitTime = maxWaitTime; this.maxWaitTime = maxWaitTime;
this.pollMs = pollMs; this.pollMs = pollMs;
this.concurrencyLimit = concurrencyLimit; this.concurrencyLimit = concurrencyLimit;
this.printQueriesFreq = printQueriesFreq;
this.queue = new LinkedBlockingDeque<>(queueLimit); this.queue = new LinkedBlockingDeque<>(queueLimit);
this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads); this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads);
this.callbackExecutor = Executors.newWorkStealingPool(callbackThreads); this.callbackExecutor = Executors.newWorkStealingPool(callbackThreads);
@ -131,6 +134,13 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
if (curLvl <= concurrencyLimit) { if (curLvl <= concurrencyLimit) {
taskCtx = queue.take(); taskCtx = queue.take();
final AsyncTaskContext<T, V> finalTaskCtx = taskCtx; final AsyncTaskContext<T, V> finalTaskCtx = taskCtx;
if (printQueriesFreq > 0) {
if (printQueriesIdx.incrementAndGet() >= printQueriesFreq) {
printQueriesIdx.set(0);
String query = queryToString(finalTaskCtx);
log.info("[{}] Cassandra query: {}", taskCtx.getId(), query);
}
}
logTask("Processing", finalTaskCtx); logTask("Processing", finalTaskCtx);
concurrencyLevel.incrementAndGet(); concurrencyLevel.incrementAndGet();
long timeout = finalTaskCtx.getCreateTime() + maxWaitTime - System.currentTimeMillis(); long timeout = finalTaskCtx.getCreateTime() + maxWaitTime - System.currentTimeMillis();
@ -187,17 +197,8 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
private void logTask(String action, AsyncTaskContext<T, V> taskCtx) { private void logTask(String action, AsyncTaskContext<T, V> taskCtx) {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
if (taskCtx.getTask() instanceof CassandraStatementTask) { if (taskCtx.getTask() instanceof CassandraStatementTask) {
CassandraStatementTask cassStmtTask = (CassandraStatementTask) taskCtx.getTask(); String query = queryToString(taskCtx);
if (cassStmtTask.getStatement() instanceof BoundStatement) { log.trace("[{}] {} task: {}, BoundStatement query: {}", taskCtx.getId(), action, taskCtx, query);
BoundStatement stmt = (BoundStatement) cassStmtTask.getStatement();
String query = stmt.preparedStatement().getQueryString();
try {
query = toStringWithValues(stmt, ProtocolVersion.V5);
} catch (Exception e) {
log.warn("Can't convert to query with values", e);
}
log.trace("[{}] {} task: {}, BoundStatement query: {}", taskCtx.getId(), action, taskCtx, query);
}
} else { } else {
log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx); log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx);
} }
@ -206,6 +207,22 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
} }
} }
private String queryToString(AsyncTaskContext<T, V> taskCtx) {
CassandraStatementTask cassStmtTask = (CassandraStatementTask) taskCtx.getTask();
if (cassStmtTask.getStatement() instanceof BoundStatement) {
BoundStatement stmt = (BoundStatement) cassStmtTask.getStatement();
String query = stmt.preparedStatement().getQueryString();
try {
query = toStringWithValues(stmt, ProtocolVersion.V5);
} catch (Exception e) {
log.warn("Can't convert to query with values", e);
}
return query;
} else {
return "Not Cassandra Statement Task";
}
}
private static String toStringWithValues(BoundStatement boundStatement, ProtocolVersion protocolVersion) { private static String toStringWithValues(BoundStatement boundStatement, ProtocolVersion protocolVersion) {
CodecRegistry codecRegistry = boundStatement.preparedStatement().getCodecRegistry(); CodecRegistry codecRegistry = boundStatement.preparedStatement().getCodecRegistry();
PreparedStatement preparedStatement = boundStatement.preparedStatement(); PreparedStatement preparedStatement = boundStatement.preparedStatement();