More clear thread names
This commit is contained in:
parent
27d0fc3b0a
commit
cacdf09eef
@ -95,8 +95,6 @@ public class DefaultActorService implements ActorService {
|
||||
|
||||
private ActorRef rpcManagerActor;
|
||||
|
||||
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
@PostConstruct
|
||||
public void initActorSystem() {
|
||||
log.info("Initializing Actor system.");
|
||||
|
||||
@ -31,6 +31,7 @@ import org.springframework.web.bind.annotation.RequestMethod;
|
||||
import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.context.request.async.DeferredResult;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.rule.engine.api.RpcError;
|
||||
import org.thingsboard.server.common.data.audit.ActionType;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
|
||||
@ -76,20 +77,6 @@ public class RpcController extends BaseController {
|
||||
@Autowired
|
||||
private AccessValidator accessValidator;
|
||||
|
||||
private ExecutorService executor;
|
||||
|
||||
@PostConstruct
|
||||
public void initExecutor() {
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void shutdownExecutor() {
|
||||
if (executor != null) {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
|
||||
@RequestMapping(value = "/oneway/{deviceId}", method = RequestMethod.POST)
|
||||
@ResponseBody
|
||||
|
||||
@ -37,6 +37,7 @@ import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.context.request.async.DeferredResult;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
@ -108,7 +109,7 @@ public class TelemetryController extends BaseController {
|
||||
|
||||
@PostConstruct
|
||||
public void initExecutor() {
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("telemetry-controller"));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
||||
@ -41,6 +41,7 @@ import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.Assert;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.actors.service.ActorService;
|
||||
import org.thingsboard.server.common.msg.cluster.ServerAddress;
|
||||
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
|
||||
@ -114,7 +115,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
|
||||
Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms"));
|
||||
Assert.notNull(zkSessionTimeout, MiscUtils.missingProperty("zk.session_timeout_ms"));
|
||||
|
||||
reconnectExecutorService = Executors.newSingleThreadExecutor();
|
||||
reconnectExecutorService = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("zk-discovery"));
|
||||
|
||||
log.info("Initializing discovery service using ZK connect string: {}", zkUrl);
|
||||
|
||||
|
||||
@ -24,6 +24,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.rule.engine.api.RpcError;
|
||||
import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg;
|
||||
import org.thingsboard.server.actors.service.ActorService;
|
||||
@ -83,7 +84,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
|
||||
|
||||
@PostConstruct
|
||||
public void initExecutor() {
|
||||
rpcCallBackExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
rpcCallBackExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("rpc-callback"));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
||||
@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.PostConstruct;
|
||||
@ -87,7 +88,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
if (maxRequestsTimeout > 0) {
|
||||
timeoutExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||
timeoutExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nashorn-js-timeout"));
|
||||
}
|
||||
if (useJsSandbox()) {
|
||||
sandbox = NashornSandboxes.create();
|
||||
|
||||
@ -24,6 +24,7 @@ import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.context.request.async.DeferredResult;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.Customer;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.EntityView;
|
||||
@ -105,7 +106,7 @@ public class AccessValidator {
|
||||
|
||||
@PostConstruct
|
||||
public void initExecutor() {
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("access-validator"));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
||||
@ -31,6 +31,7 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.actors.service.ActorService;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
@ -144,7 +145,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
// Should be always single threaded due to absence of locks.
|
||||
queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
|
||||
queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state")));
|
||||
queueExecutor.submit(this::initStateFromDB);
|
||||
queueExecutor.scheduleAtFixedRate(this::updateState, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
|
||||
//TODO: schedule persistence in v2.1;
|
||||
|
||||
@ -24,6 +24,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
|
||||
import org.thingsboard.common.util.DonAsynchron;
|
||||
import org.thingsboard.server.actors.service.ActorService;
|
||||
@ -110,8 +111,8 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
|
||||
|
||||
@PostConstruct
|
||||
public void initExecutor() {
|
||||
tsCallBackExecutor = Executors.newSingleThreadExecutor();
|
||||
wsCallBackExecutor = Executors.newSingleThreadExecutor();
|
||||
tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback"));
|
||||
wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-sub-callback"));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
||||
@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.rule.engine.api.RuleChainTransactionService;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
@ -71,7 +72,7 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
timeoutExecutor = Executors.newSingleThreadExecutor();
|
||||
timeoutExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("rule-chain-transaction"));
|
||||
executeOnTimeout();
|
||||
}
|
||||
|
||||
|
||||
@ -32,6 +32,7 @@ import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.common.msg.cluster.ServerAddress;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
|
||||
@ -103,7 +104,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
|
||||
private TBKafkaConsumerTemplate<ToRuleEngineMsg> ruleEngineConsumer;
|
||||
private TBKafkaProducerTemplate<ToTransportMsg> notificationsProducer;
|
||||
|
||||
private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor();
|
||||
private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-main-consumer"));
|
||||
|
||||
private volatile boolean stopped = false;
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.UpdateMessage;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
@ -50,7 +51,7 @@ public class DefaultUpdateService implements UpdateService {
|
||||
@Value("${updates.enabled}")
|
||||
private boolean updatesEnabled;
|
||||
|
||||
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
||||
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-update-service"));
|
||||
|
||||
private ScheduledFuture checkUpdatesFuture = null;
|
||||
private RestTemplate restClient = new RestTemplate();
|
||||
|
||||
@ -195,10 +195,6 @@ cassandra:
|
||||
|
||||
# SQL configuration parameters
|
||||
sql:
|
||||
# Specify executor service type used to perform timeseries insert tasks: SINGLE or FIXED
|
||||
ts_inserts_executor_type: "${SQL_TS_INSERTS_EXECUTOR_TYPE:fixed}"
|
||||
# Specify thread pool size for FIXED executor service type
|
||||
ts_inserts_fixed_thread_pool_size: "${SQL_TS_INSERTS_FIXED_THREAD_POOL_SIZE:200}"
|
||||
# Specify batch size for persisting attribute updates
|
||||
attributes:
|
||||
batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}"
|
||||
|
||||
@ -44,6 +44,10 @@
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<artifactId>message</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<artifactId>util</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<artifactId>queue</artifactId>
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.common.transport.service;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
@ -300,7 +301,7 @@ public abstract class AbstractTransportService implements TransportService {
|
||||
new TbRateLimits(perTenantLimitsConf);
|
||||
new TbRateLimits(perDevicesLimitsConf);
|
||||
}
|
||||
this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler"));
|
||||
this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
|
||||
this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
|
||||
@ -100,7 +101,7 @@ public class RemoteTransportService extends AbstractTransportService {
|
||||
private TBKafkaProducerTemplate<ToRuleEngineMsg> ruleEngineProducer;
|
||||
private TBKafkaConsumerTemplate<ToTransportMsg> mainConsumer;
|
||||
|
||||
private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor();
|
||||
private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("remote-transport-consumer"));
|
||||
|
||||
private volatile boolean stopped = false;
|
||||
|
||||
|
||||
@ -0,0 +1,54 @@
|
||||
/**
|
||||
* Copyright © 2016-2019 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.common.util;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Copy of Executors.DefaultThreadFactory but with ability to set name of the pool
|
||||
*/
|
||||
public class ThingsBoardThreadFactory implements ThreadFactory {
|
||||
private static final AtomicInteger poolNumber = new AtomicInteger(1);
|
||||
private final ThreadGroup group;
|
||||
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||
private final String namePrefix;
|
||||
|
||||
public static ThingsBoardThreadFactory forName(String name) {
|
||||
return new ThingsBoardThreadFactory(name);
|
||||
}
|
||||
|
||||
private ThingsBoardThreadFactory(String name) {
|
||||
SecurityManager s = System.getSecurityManager();
|
||||
group = (s != null) ? s.getThreadGroup() :
|
||||
Thread.currentThread().getThreadGroup();
|
||||
namePrefix = name + "-" +
|
||||
poolNumber.getAndIncrement() +
|
||||
"-thread-";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(group, r,
|
||||
namePrefix + threadNumber.getAndIncrement(),
|
||||
0);
|
||||
if (t.isDaemon())
|
||||
t.setDaemon(false);
|
||||
if (t.getPriority() != Thread.NORM_PRIORITY)
|
||||
t.setPriority(Thread.NORM_PRIORITY);
|
||||
return t;
|
||||
}
|
||||
}
|
||||
@ -24,6 +24,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
@ -81,7 +82,7 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
|
||||
|
||||
@PostConstruct
|
||||
public void startExecutor() {
|
||||
readResultsProcessingExecutor = Executors.newCachedThreadPool();
|
||||
readResultsProcessingExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("alarm-service"));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
||||
@ -29,6 +29,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.audit.ActionType;
|
||||
import org.thingsboard.server.common.data.audit.AuditLog;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
@ -117,7 +118,7 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao<AuditLo
|
||||
throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!");
|
||||
}
|
||||
}
|
||||
readResultsProcessingExecutor = Executors.newCachedThreadPool();
|
||||
readResultsProcessingExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("audit-log"));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
||||
@ -20,6 +20,7 @@ import com.datastax.driver.core.ResultSetFuture;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.PostConstruct;
|
||||
@ -36,7 +37,7 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao {
|
||||
|
||||
@PostConstruct
|
||||
public void startExecutor() {
|
||||
readResultsProcessingExecutor = Executors.newCachedThreadPool();
|
||||
readResultsProcessingExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("cassandra-callback"));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.server.dao.sql;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
@ -30,7 +31,7 @@ public class ScheduledLogExecutorComponent {
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
schedulerLogExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
schedulerLogExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("sql-log"));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.dao.sql;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -49,7 +50,7 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
|
||||
@Override
|
||||
public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction) {
|
||||
this.logExecutor = logExecutor;
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("sql-queue-" + params.getLogName().toLowerCase()));
|
||||
executor.submit(() -> {
|
||||
String logName = params.getLogName();
|
||||
int batchSize = params.getBatchSize();
|
||||
|
||||
@ -44,44 +44,6 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
|
||||
|
||||
private static final String DESC_ORDER = "DESC";
|
||||
|
||||
@Value("${sql.ts_inserts_executor_type}")
|
||||
private String insertExecutorType;
|
||||
|
||||
@Value("${sql.ts_inserts_fixed_thread_pool_size}")
|
||||
private int insertFixedThreadPoolSize;
|
||||
|
||||
@Value("${spring.datasource.hikari.maximumPoolSize}")
|
||||
private int maximumPoolSize;
|
||||
|
||||
protected ListeningExecutorService insertService;
|
||||
|
||||
@PostConstruct
|
||||
void init() {
|
||||
Optional<TsInsertExecutorType> executorTypeOptional = TsInsertExecutorType.parse(insertExecutorType);
|
||||
TsInsertExecutorType executorType;
|
||||
executorType = executorTypeOptional.orElse(TsInsertExecutorType.FIXED);
|
||||
switch (executorType) {
|
||||
case SINGLE:
|
||||
insertService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
|
||||
break;
|
||||
case FIXED:
|
||||
case CACHED:
|
||||
int poolSize = insertFixedThreadPoolSize;
|
||||
if (poolSize <= 0) {
|
||||
poolSize = maximumPoolSize * 4;
|
||||
}
|
||||
insertService = MoreExecutors.listeningDecorator(Executors.newWorkStealingPool(poolSize));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
void preDestroy() {
|
||||
if (insertService != null) {
|
||||
insertService.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
protected ListenableFuture<List<TsKvEntry>> processFindAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
|
||||
List<ListenableFuture<List<TsKvEntry>>> futures = queries
|
||||
.stream()
|
||||
|
||||
@ -170,12 +170,12 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {
|
||||
return insertService.submit(() -> null);
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
|
||||
return insertService.submit(() -> null);
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -334,7 +334,7 @@ public class JpaTimeseriesDao extends AbstractSqlTimeseriesDao implements Timese
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {
|
||||
return insertService.submit(() -> null);
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.tools.TbRateLimits;
|
||||
import org.thingsboard.server.dao.nosql.CassandraStatementTask;
|
||||
@ -67,9 +68,9 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
||||
this.concurrencyLimit = concurrencyLimit;
|
||||
this.printQueriesFreq = printQueriesFreq;
|
||||
this.queue = new LinkedBlockingDeque<>(queueLimit);
|
||||
this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads);
|
||||
this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads, ThingsBoardThreadFactory.forName("nosql-dispatcher"));
|
||||
this.callbackExecutor = Executors.newWorkStealingPool(callbackThreads);
|
||||
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nosql-timeout"));
|
||||
this.perTenantLimitsEnabled = perTenantLimitsEnabled;
|
||||
this.perTenantLimitsConfiguration = perTenantLimitsConfiguration;
|
||||
for (int i = 0; i < dispatcherThreads; i++) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user