Merge pull request #9016 from smatvienko-tb/feature/cassandra-result_processing_threads

Feature/ Cassandra unlimited cached thread pool replaced with limited pool to prevent stack memory peaks
This commit is contained in:
Andrew Shvayka 2023-07-31 13:27:41 +03:00 committed by GitHub
commit 1248b66004
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 12 deletions

View File

@ -245,7 +245,8 @@ cassandra:
concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}" concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}" permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
dispatcher_threads: "${CASSANDRA_QUERY_DISPATCHER_THREADS:2}" dispatcher_threads: "${CASSANDRA_QUERY_DISPATCHER_THREADS:2}"
callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}" callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}" # Buffered rate executor (read, write), for managing I/O rate. See "nosql-*-callback" threads in JMX
result_processing_threads: "${CASSANDRA_QUERY_RESULT_PROCESSING_THREADS:50}" # Result set transformer and processing. See "cassandra-callback" threads in JMX
poll_ms: "${CASSANDRA_QUERY_POLL_MS:50}" poll_ms: "${CASSANDRA_QUERY_POLL_MS:50}"
rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}" rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}"
# set all data types values except target to null for the same ts on save # set all data types values except target to null for the same ts on save

View File

@ -19,13 +19,13 @@ import com.google.common.base.Function;
import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.common.util.ThingsBoardExecutors;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** /**
* Created by ashvayka on 21.02.17. * Created by ashvayka on 21.02.17.
@ -34,9 +34,12 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao {
protected ExecutorService readResultsProcessingExecutor; protected ExecutorService readResultsProcessingExecutor;
@Value("${cassandra.query.result_processing_threads:50}")
private int threadPoolSize;
@PostConstruct @PostConstruct
public void startExecutor() { public void startExecutor() {
readResultsProcessingExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("cassandra-callback")); readResultsProcessingExecutor = ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "cassandra-callback");
} }
@PreDestroy @PreDestroy

View File

@ -124,7 +124,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
if (tenantProfileConfiguration != null && if (tenantProfileConfiguration != null &&
StringUtils.isNotEmpty(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())) { StringUtils.isNotEmpty(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())) {
if (task.getTenantId() == null) { if (task.getTenantId() == null) {
log.info("Invalid task received: {}", task); log.info("[{}] Invalid task received: {}", getBufferName(), task);
} else if (!task.getTenantId().isNullUid()) { } else if (!task.getTenantId().isNullUid()) {
TbRateLimits rateLimits = perTenantLimits.computeIfAbsent( TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(
task.getTenantId(), id -> new TbRateLimits(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration()) task.getTenantId(), id -> new TbRateLimits(tenantProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())
@ -173,7 +173,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
public abstract String getBufferName(); public abstract String getBufferName();
private void dispatch() { private void dispatch() {
log.info("Buffered rate executor thread started"); log.info("[{}] Buffered rate executor thread started", getBufferName());
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
int curLvl = concurrencyLevel.get(); int curLvl = concurrencyLevel.get();
AsyncTaskContext<T, V> taskCtx = null; AsyncTaskContext<T, V> taskCtx = null;
@ -185,7 +185,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
if (printQueriesIdx.incrementAndGet() >= printQueriesFreq) { if (printQueriesIdx.incrementAndGet() >= printQueriesFreq) {
printQueriesIdx.set(0); printQueriesIdx.set(0);
String query = queryToString(finalTaskCtx); String query = queryToString(finalTaskCtx);
log.info("[{}] Cassandra query: {}", taskCtx.getId(), query); log.info("[{}][{}] Cassandra query: {}", getBufferName(), taskCtx.getId(), query);
} }
} }
logTask("Processing", finalTaskCtx); logTask("Processing", finalTaskCtx);
@ -238,7 +238,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
} }
} }
} }
log.info("Buffered rate executor thread stopped"); log.info("[{}] Buffered rate executor thread stopped", getBufferName());
} }
private void logTask(String action, AsyncTaskContext<T, V> taskCtx) { private void logTask(String action, AsyncTaskContext<T, V> taskCtx) {
@ -314,7 +314,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
statsBuilder.append(CONCURRENCY_LEVEL).append(" = [").append(concurrencyLevel.get()).append("] "); statsBuilder.append(CONCURRENCY_LEVEL).append(" = [").append(concurrencyLevel.get()).append("] ");
stats.getStatsCounters().forEach(StatsCounter::clear); stats.getStatsCounters().forEach(StatsCounter::clear);
log.info("Permits {}", statsBuilder); log.info("[{}] Permits {}", getBufferName(), statsBuilder);
} }
stats.getRateLimitedTenants().entrySet().stream() stats.getRateLimitedTenants().entrySet().stream()
@ -330,13 +330,13 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
try { try {
return entityService.fetchEntityName(TenantId.SYS_TENANT_ID, tenantId).orElse(defaultName); return entityService.fetchEntityName(TenantId.SYS_TENANT_ID, tenantId).orElse(defaultName);
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] Failed to get tenant name", tenantId, e); log.error("[{}][{}] Failed to get tenant name", getBufferName(), tenantId, e);
return defaultName; return defaultName;
} }
}); });
log.info("[{}][{}] Rate limited requests: {}", tenantId, name, rateLimitedRequests); log.info("[{}][{}][{}] Rate limited requests: {}", getBufferName(), tenantId, name, rateLimitedRequests);
} else { } else {
log.info("[{}] Rate limited requests: {}", tenantId, rateLimitedRequests); log.info("[{}][{}] Rate limited requests: {}", getBufferName(), tenantId, rateLimitedRequests);
} }
}); });
} }