Cassandra: unlimited cached thread pool replaced with limited pool to prevent stack memory peaks. New env CASSANDRA_QUERY_RESULT_PROCESSING_THREADS introduced.

This commit is contained in:
Sergey Matvienko 2023-07-31 11:59:31 +02:00
parent cb37450470
commit 161d7c3eda
2 changed files with 8 additions and 4 deletions

View File

@ -245,7 +245,8 @@ cassandra:
concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}" concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}" permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
dispatcher_threads: "${CASSANDRA_QUERY_DISPATCHER_THREADS:2}" dispatcher_threads: "${CASSANDRA_QUERY_DISPATCHER_THREADS:2}"
callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}" callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}" # Buffered rate executor (read, write), for managing I/O rate. See "nosql-*-callback" threads in JMX
result_processing_threads: "${CASSANDRA_QUERY_RESULT_PROCESSING_THREADS:50}" # Result set transformer and processing. See "cassandra-callback" threads in JMX
poll_ms: "${CASSANDRA_QUERY_POLL_MS:50}" poll_ms: "${CASSANDRA_QUERY_POLL_MS:50}"
rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}" rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}"
# set all data types values except target to null for the same ts on save # set all data types values except target to null for the same ts on save

View File

@ -19,13 +19,13 @@ import com.google.common.base.Function;
import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.common.util.ThingsBoardExecutors;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** /**
* Created by ashvayka on 21.02.17. * Created by ashvayka on 21.02.17.
@ -34,9 +34,12 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao {
protected ExecutorService readResultsProcessingExecutor; protected ExecutorService readResultsProcessingExecutor;
@Value("${cassandra.query.result_processing_threads:50}")
private int threadPoolSize;
@PostConstruct @PostConstruct
public void startExecutor() { public void startExecutor() {
readResultsProcessingExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("cassandra-callback")); readResultsProcessingExecutor = ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "cassandra-callback");
} }
@PreDestroy @PreDestroy