diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 5015ff090d..f6df3af742 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -183,6 +183,8 @@ cassandra: rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}" # set all data types values except target to null for the same ts on save set_null_values_enabled: "${CASSANDRA_QUERY_SET_NULL_VALUES_ENABLED:false}" + # log one of cassandra queries with specified frequency (0 - logging is disabled) + print_queries_freq: "${CASSANDRA_QUERY_PRINT_FREQ:0}" tenant_rate_limits: enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}" configuration: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_CONFIGURATION:1000:1,30000:60}" diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java index 12c2ee85f5..37aaa532fd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java @@ -56,8 +56,9 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor< @Value("${cassandra.query.poll_ms:50}") long pollMs, @Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled, @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration, - @Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames) { - super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration); + @Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames, + @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq) { + super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq); this.printTenantNames = printTenantNames; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java index c22321ee52..a553aa5f9c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java @@ -44,6 +44,7 @@ public abstract class AbstractBufferedRateExecutor perTenantLimits = new ConcurrentHashMap<>(); @@ -57,12 +58,14 @@ public abstract class AbstractBufferedRateExecutor(queueLimit); this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads); this.callbackExecutor = Executors.newWorkStealingPool(callbackThreads); @@ -131,6 +134,13 @@ public abstract class AbstractBufferedRateExecutor finalTaskCtx = taskCtx; + if (printQueriesFreq > 0) { + if (printQueriesIdx.incrementAndGet() >= printQueriesFreq) { + printQueriesIdx.set(0); + String query = queryToString(finalTaskCtx); + log.info("[{}] Cassandra query: {}", taskCtx.getId(), query); + } + } logTask("Processing", finalTaskCtx); concurrencyLevel.incrementAndGet(); long timeout = finalTaskCtx.getCreateTime() + maxWaitTime - System.currentTimeMillis(); @@ -187,17 +197,8 @@ public abstract class AbstractBufferedRateExecutor taskCtx) { if (log.isTraceEnabled()) { if (taskCtx.getTask() instanceof CassandraStatementTask) { - CassandraStatementTask cassStmtTask = (CassandraStatementTask) taskCtx.getTask(); - if (cassStmtTask.getStatement() instanceof BoundStatement) { - BoundStatement stmt = (BoundStatement) cassStmtTask.getStatement(); - String query = stmt.preparedStatement().getQueryString(); - try { - query = toStringWithValues(stmt, ProtocolVersion.V5); - } catch (Exception e) { - log.warn("Can't convert to query with values", e); - } - log.trace("[{}] {} task: {}, BoundStatement query: {}", taskCtx.getId(), action, taskCtx, query); - } + String query = queryToString(taskCtx); + log.trace("[{}] {} task: {}, BoundStatement query: {}", taskCtx.getId(), action, taskCtx, query); } else { log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx); } @@ -206,6 +207,22 @@ public abstract class AbstractBufferedRateExecutor taskCtx) { + CassandraStatementTask cassStmtTask = (CassandraStatementTask) taskCtx.getTask(); + if (cassStmtTask.getStatement() instanceof BoundStatement) { + BoundStatement stmt = (BoundStatement) cassStmtTask.getStatement(); + String query = stmt.preparedStatement().getQueryString(); + try { + query = toStringWithValues(stmt, ProtocolVersion.V5); + } catch (Exception e) { + log.warn("Can't convert to query with values", e); + } + return query; + } else { + return "Not Cassandra Statement Task"; + } + } + private static String toStringWithValues(BoundStatement boundStatement, ProtocolVersion protocolVersion) { CodecRegistry codecRegistry = boundStatement.preparedStatement().getCodecRegistry(); PreparedStatement preparedStatement = boundStatement.preparedStatement();