Thread pool naming CE (#4511)

* added ThingsBoardExecutors and ThingsBoardForkJoinWorkerThreadFactory to define the name for the pool thread

* thread poll named for the AbstractListeningExecutor

* thread poll named for the RestClient. ThingsBoard util dependency added

* most of thread polls named CE

* thread poll name added for telemetry-web-socket-ping

* executors: added custom names for executors (Queue, RuleEngine). Add topic name to the Thread name (useful for JMX and thread dump)

* fixed licence header for a new classes
This commit is contained in:
Sergey Matvienko 2021-04-29 14:15:50 +03:00 committed by GitHub
parent b4b5d062b0
commit 39f0775766
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 127 additions and 17 deletions

View File

@ -22,6 +22,7 @@ import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.DefaultTbActorSystem;
@ -108,7 +109,7 @@ public class DefaultActorService extends TbApplicationEventListener<PartitionCha
if (poolSize == 1) {
return Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(dispatcherName));
} else {
return Executors.newWorkStealingPool(poolSize);
return ThingsBoardExecutors.newWorkStealingPool(poolSize, dispatcherName);
}
}

View File

@ -23,6 +23,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.server.common.data.ApiFeature;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
@ -146,7 +147,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
this.scheduler = scheduler;
this.tenantProfileCache = tenantProfileCache;
this.mailService = mailService;
this.mailExecutor = Executors.newSingleThreadExecutor();
this.mailExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("api-usage-svc-mail"));
}
@PostConstruct

View File

@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.RpcError;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.id.TenantId;
@ -127,7 +128,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
consumers.computeIfAbsent(configuration.getName(), queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration));
consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName(), statsFactory));
}
submitExecutor = Executors.newSingleThreadExecutor();
submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-service-submit-executor"));
}
@PreDestroy
@ -160,6 +161,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
private void launchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats) {
consumersExecutor.execute(() -> {
Thread.currentThread().setName("" + Thread.currentThread().getName() + "-" + configuration.getName());
while (!stopped) {
try {
List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(pollDuration);

View File

@ -25,6 +25,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
@ -93,7 +94,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
super.init(maxRequestsTimeout);
if (useJsSandbox()) {
sandbox = NashornSandboxes.create();
monitorExecutorService = Executors.newWorkStealingPool(getMonitorThreadPoolSize());
monitorExecutorService = ThingsBoardExecutors.newWorkStealingPool(getMonitorThreadPoolSize(), "nashorn-js-monitor");
sandbox.setExecutor(monitorExecutorService);
sandbox.setMaxCPUTime(getMaxCpuTime());
sandbox.allowNoBraces(false);

View File

@ -20,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
@ -62,7 +63,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
private SubscriptionManagerService subscriptionManagerService;
private ExecutorService subscriptionUpdateExecutor;
private TbApplicationEventListener<PartitionChangeEvent> partitionChangeListener = new TbApplicationEventListener<>() {
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
@ -93,7 +94,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
@PostConstruct
public void initExecutor() {
subscriptionUpdateExecutor = Executors.newWorkStealingPool(20);
subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass());
}
@PreDestroy

View File

@ -28,6 +28,8 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.socket.CloseStatus;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
@ -157,9 +159,9 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
@PostConstruct
public void initExecutor() {
serviceId = serviceInfoProvider.getServiceId();
executor = Executors.newWorkStealingPool(50);
executor = ThingsBoardExecutors.newWorkStealingPool(50, getClass());
pingExecutor = Executors.newSingleThreadScheduledExecutor();
pingExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("telemetry-web-socket-ping"));
pingExecutor.scheduleWithFixedDelay(this::sendPing, 10000, 10000, TimeUnit.MILLISECONDS);
}

View File

@ -21,6 +21,7 @@ import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.stats.MessagesStats;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType;
@ -70,7 +71,7 @@ public class TbCoreTransportApiService {
@PostConstruct
public void init() {
this.transportCallbackExecutor = Executors.newWorkStealingPool(maxCallbackThreads);
this.transportCallbackExecutor = ThingsBoardExecutors.newWorkStealingPool(maxCallbackThreads, getClass());
TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> producer = tbCoreQueueFactory.createTransportApiResponseProducer();
TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> consumer = tbCoreQueueFactory.createTransportApiRequestConsumer();

View File

@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueCallback;
@ -77,7 +78,7 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
this.executor = executor;
} else {
internalExecutor = true;
this.executor = Executors.newSingleThreadExecutor();
this.executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-queue-request-template-" + responseTemplate.getTopic()));
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.queue.common;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueHandler;
@ -70,8 +71,8 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
this.requestTimeout = requestTimeout;
this.callbackExecutor = executor;
this.stats = stats;
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
this.loopExecutor = Executors.newSingleThreadExecutor();
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tb-queue-response-template-timeout-" + requestTemplate.getTopic()));
this.loopExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-queue-response-template-loop-" + requestTemplate.getTopic()));
}
@Override

View File

@ -20,6 +20,7 @@ import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.cache.firmware.FirmwareDataCache;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
@ -57,7 +58,7 @@ public abstract class TransportContext {
@PostConstruct
public void init() {
executor = Executors.newWorkStealingPool(50);
executor = ThingsBoardExecutors.newWorkStealingPool(50, getClass());
}
@PreDestroy

View File

@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.ApiUsageState;
@ -186,7 +187,7 @@ public class DefaultTransportService implements TransportService {
this.ruleEngineProducerStats = statsFactory.createMessagesStats(StatsType.RULE_ENGINE.getName() + ".producer");
this.tbCoreProducerStats = statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer");
this.transportApiStats = statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer");
this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
this.transportCallbackExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass());
this.scheduler.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate();
transportApiRequestTemplate.setMessagesStats(transportApiStats);

View File

@ -34,7 +34,7 @@ public abstract class AbstractListeningExecutor implements ListeningExecutor {
@PostConstruct
public void init() {
this.service = MoreExecutors.listeningDecorator(Executors.newWorkStealingPool(getThreadPollSize()));
this.service = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(getThreadPollSize(), getClass()));
}
@PreDestroy

View File

@ -0,0 +1,50 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
public class ThingsBoardExecutors {
/**
* Method forked from ExecutorService to provide thread poll name
*
* Creates a thread pool that maintains enough threads to support
* the given parallelism level, and may use multiple queues to
* reduce contention. The parallelism level corresponds to the
* maximum number of threads actively engaged in, or available to
* engage in, task processing. The actual number of threads may
* grow and shrink dynamically. A work-stealing pool makes no
* guarantees about the order in which submitted tasks are
* executed.
*
* @param parallelism the targeted parallelism level
* @param namePrefix used to define thread name
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code parallelism <= 0}
* @since 1.8
*/
public static ExecutorService newWorkStealingPool(int parallelism, String namePrefix) {
return new ForkJoinPool(parallelism,
new ThingsBoardForkJoinWorkerThreadFactory(namePrefix),
null, true);
}
public static ExecutorService newWorkStealingPool(int parallelism, Class clazz) {
return newWorkStealingPool(parallelism, clazz.getSimpleName());
}
}

View File

@ -0,0 +1,41 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import lombok.NonNull;
import lombok.ToString;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.atomic.AtomicLong;
@ToString
public class ThingsBoardForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
private final String namePrefix;
private final AtomicLong threadNumber = new AtomicLong(1);
public ThingsBoardForkJoinWorkerThreadFactory(@NonNull String namePrefix) {
this.namePrefix = namePrefix;
}
@Override
public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setName(namePrefix +"-"+thread.getPoolIndex()+"-"+threadNumber.getAndIncrement());
return thread;
}
}

View File

@ -28,6 +28,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.stats.StatsFactory;
@ -82,7 +83,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
this.printQueriesFreq = printQueriesFreq;
this.queue = new LinkedBlockingDeque<>(queueLimit);
this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads, ThingsBoardThreadFactory.forName("nosql-dispatcher"));
this.callbackExecutor = Executors.newWorkStealingPool(callbackThreads);
this.callbackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreads, getClass());
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nosql-timeout"));
this.perTenantLimitsEnabled = perTenantLimitsEnabled;
this.perTenantLimitsConfiguration = perTenantLimitsConfiguration;

View File

@ -43,6 +43,10 @@
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>util</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -31,6 +31,7 @@ import org.springframework.http.client.support.HttpRequestWrapper;
import org.springframework.util.StringUtils;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rest.client.utils.RestJsonConverter;
import org.thingsboard.server.common.data.AdminSettings;
import org.thingsboard.server.common.data.ClaimRequest;
@ -144,7 +145,7 @@ public class RestClient implements ClientHttpRequestInterceptor, Closeable {
private String token;
private String refreshToken;
private final ObjectMapper objectMapper = new ObjectMapper();
private ExecutorService service = Executors.newWorkStealingPool(10);
private ExecutorService service = ThingsBoardExecutors.newWorkStealingPool(10, getClass());
protected static final String ACTIVATE_TOKEN_REGEX = "/api/noauth/activate?activateToken=";