From cacdf09eef1a1f68588e5a5d4349c0458c9043a6 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Fri, 20 Dec 2019 13:50:07 +0200 Subject: [PATCH] More clear thread names --- .../actors/service/DefaultActorService.java | 2 - .../server/controller/RpcController.java | 15 +----- .../controller/TelemetryController.java | 3 +- .../cluster/discovery/ZkDiscoveryService.java | 3 +- .../service/rpc/DefaultDeviceRpcService.java | 3 +- .../AbstractNashornJsInvokeService.java | 3 +- .../service/security/AccessValidator.java | 3 +- .../state/DefaultDeviceStateService.java | 3 +- .../DefaultTelemetrySubscriptionService.java | 5 +- .../BaseRuleChainTransactionService.java | 3 +- .../RemoteRuleEngineTransportService.java | 3 +- .../service/update/DefaultUpdateService.java | 3 +- .../src/main/resources/thingsboard.yml | 4 -- common/transport/transport-api/pom.xml | 4 ++ .../service/AbstractTransportService.java | 3 +- .../service/RemoteTransportService.java | 3 +- .../common/util/ThingsBoardThreadFactory.java | 54 +++++++++++++++++++ .../server/dao/alarm/BaseAlarmService.java | 3 +- .../dao/audit/CassandraAuditLogDao.java | 3 +- .../dao/nosql/CassandraAbstractAsyncDao.java | 3 +- .../sql/ScheduledLogExecutorComponent.java | 3 +- .../server/dao/sql/TbSqlBlockingQueue.java | 3 +- .../dao/sqlts/AbstractSqlTimeseriesDao.java | 38 ------------- .../timescale/TimescaleTimeseriesDao.java | 4 +- .../server/dao/sqlts/ts/JpaTimeseriesDao.java | 2 +- .../util/AbstractBufferedRateExecutor.java | 5 +- 26 files changed, 100 insertions(+), 81 deletions(-) create mode 100644 common/util/src/main/java/org/thingsboard/common/util/ThingsBoardThreadFactory.java diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java index 852fbe2884..2979d9310b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java @@ -95,8 +95,6 @@ public class DefaultActorService implements ActorService { private ActorRef rpcManagerActor; - private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - @PostConstruct public void initActorSystem() { log.info("Initializing Actor system."); diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcController.java b/application/src/main/java/org/thingsboard/server/controller/RpcController.java index 3b624e9b94..0980ca068c 100644 --- a/application/src/main/java/org/thingsboard/server/controller/RpcController.java +++ b/application/src/main/java/org/thingsboard/server/controller/RpcController.java @@ -31,6 +31,7 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; @@ -76,20 +77,6 @@ public class RpcController extends BaseController { @Autowired private AccessValidator accessValidator; - private ExecutorService executor; - - @PostConstruct - public void initExecutor() { - executor = Executors.newSingleThreadExecutor(); - } - - @PreDestroy - public void shutdownExecutor() { - if (executor != null) { - executor.shutdownNow(); - } - } - @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") @RequestMapping(value = "/oneway/{deviceId}", method = RequestMethod.POST) @ResponseBody diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index d46e6475c0..a7987cec3d 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -37,6 +37,7 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; @@ -108,7 +109,7 @@ public class TelemetryController extends BaseController { @PostConstruct public void initExecutor() { - executor = Executors.newSingleThreadExecutor(); + executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("telemetry-controller")); } @PreDestroy diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java index e44c28dc39..a34c8dded0 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java @@ -41,6 +41,7 @@ import org.springframework.context.annotation.Lazy; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import org.springframework.util.Assert; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.service.ActorService; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; @@ -114,7 +115,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi Assert.notNull(zkConnectionTimeout, MiscUtils.missingProperty("zk.connection_timeout_ms")); Assert.notNull(zkSessionTimeout, MiscUtils.missingProperty("zk.session_timeout_ms")); - reconnectExecutorService = Executors.newSingleThreadExecutor(); + reconnectExecutorService = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("zk-discovery")); log.info("Initializing discovery service using ZK connect string: {}", zkUrl); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java index 2e1078fa8e..5fe8cb30df 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultDeviceRpcService.java @@ -24,6 +24,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg; import org.thingsboard.server.actors.service.ActorService; @@ -83,7 +84,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @PostConstruct public void initExecutor() { - rpcCallBackExecutor = Executors.newSingleThreadScheduledExecutor(); + rpcCallBackExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("rpc-callback")); } @PreDestroy diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java index 807c43ffa7..6c0afbf2e2 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java @@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import javax.annotation.Nullable; import javax.annotation.PostConstruct; @@ -87,7 +88,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer @PostConstruct public void init() { if (maxRequestsTimeout > 0) { - timeoutExecutorService = Executors.newSingleThreadScheduledExecutor(); + timeoutExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nashorn-js-timeout")); } if (useJsSandbox()) { sandbox = NashornSandboxes.create(); diff --git a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java index 6b6e2553fa..f49db67e96 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java +++ b/application/src/main/java/org/thingsboard/server/service/security/AccessValidator.java @@ -24,6 +24,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.web.context.request.async.DeferredResult; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityView; @@ -105,7 +106,7 @@ public class AccessValidator { @PostConstruct public void initExecutor() { - executor = Executors.newSingleThreadExecutor(); + executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("access-validator")); } @PreDestroy diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index ac0644719d..f162d05419 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -31,6 +31,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.service.ActorService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; @@ -144,7 +145,7 @@ public class DefaultDeviceStateService implements DeviceStateService { @PostConstruct public void init() { // Should be always single threaded due to absence of locks. - queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor()); + queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state"))); queueExecutor.submit(this::initStateFromDB); queueExecutor.scheduleAtFixedRate(this::updateState, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS); //TODO: schedule persistence in v2.1; diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index c4ddcf654b..a829fa371b 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -24,6 +24,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.server.actors.service.ActorService; @@ -110,8 +111,8 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @PostConstruct public void initExecutor() { - tsCallBackExecutor = Executors.newSingleThreadExecutor(); - wsCallBackExecutor = Executors.newSingleThreadExecutor(); + tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback")); + wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-sub-callback")); } @PreDestroy diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java index b40e2b93fd..080478c4c4 100644 --- a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java +++ b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.RuleChainTransactionService; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.msg.TbMsg; @@ -71,7 +72,7 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ @PostConstruct public void init() { - timeoutExecutor = Executors.newSingleThreadExecutor(); + timeoutExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("rule-chain-transaction")); executeOnTimeout(); } diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java index 1c1c7be483..d2c18b2406 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java @@ -32,6 +32,7 @@ import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg; @@ -103,7 +104,7 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ private TBKafkaConsumerTemplate ruleEngineConsumer; private TBKafkaProducerTemplate notificationsProducer; - private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(); + private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-main-consumer")); private volatile boolean stopped = false; diff --git a/application/src/main/java/org/thingsboard/server/service/update/DefaultUpdateService.java b/application/src/main/java/org/thingsboard/server/service/update/DefaultUpdateService.java index 14cbf95861..479f5afcb9 100644 --- a/application/src/main/java/org/thingsboard/server/service/update/DefaultUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/update/DefaultUpdateService.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.UpdateMessage; import javax.annotation.PostConstruct; @@ -50,7 +51,7 @@ public class DefaultUpdateService implements UpdateService { @Value("${updates.enabled}") private boolean updatesEnabled; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-update-service")); private ScheduledFuture checkUpdatesFuture = null; private RestTemplate restClient = new RestTemplate(); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index c03240e17f..b3662a60e1 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -195,10 +195,6 @@ cassandra: # SQL configuration parameters sql: - # Specify executor service type used to perform timeseries insert tasks: SINGLE or FIXED - ts_inserts_executor_type: "${SQL_TS_INSERTS_EXECUTOR_TYPE:fixed}" - # Specify thread pool size for FIXED executor service type - ts_inserts_fixed_thread_pool_size: "${SQL_TS_INSERTS_FIXED_THREAD_POOL_SIZE:200}" # Specify batch size for persisting attribute updates attributes: batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}" diff --git a/common/transport/transport-api/pom.xml b/common/transport/transport-api/pom.xml index dc6cbfd1ba..25a9b456fb 100644 --- a/common/transport/transport-api/pom.xml +++ b/common/transport/transport-api/pom.xml @@ -44,6 +44,10 @@ org.thingsboard.common message + + org.thingsboard.common + util + org.thingsboard.common queue diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java index b09e94202d..bcafa177a4 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.common.transport.service; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; @@ -300,7 +301,7 @@ public abstract class AbstractTransportService implements TransportService { new TbRateLimits(perTenantLimitsConf); new TbRateLimits(perDevicesLimitsConf); } - this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(); + this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler")); this.transportCallbackExecutor = Executors.newWorkStealingPool(20); this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java index 632ce16bc1..5397837b68 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java @@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; @@ -100,7 +101,7 @@ public class RemoteTransportService extends AbstractTransportService { private TBKafkaProducerTemplate ruleEngineProducer; private TBKafkaConsumerTemplate mainConsumer; - private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(); + private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("remote-transport-consumer")); private volatile boolean stopped = false; diff --git a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardThreadFactory.java b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardThreadFactory.java new file mode 100644 index 0000000000..ff08349bd4 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardThreadFactory.java @@ -0,0 +1,54 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Copy of Executors.DefaultThreadFactory but with ability to set name of the pool + */ +public class ThingsBoardThreadFactory implements ThreadFactory { + private static final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + public static ThingsBoardThreadFactory forName(String name) { + return new ThingsBoardThreadFactory(name); + } + + private ThingsBoardThreadFactory(String name) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + namePrefix = name + "-" + + poolNumber.getAndIncrement() + + "-thread-"; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), + 0); + if (t.isDaemon()) + t.setDaemon(false); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java index 8a4279fe7f..d46445778d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java @@ -24,6 +24,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.alarm.Alarm; @@ -81,7 +82,7 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ @PostConstruct public void startExecutor() { - readResultsProcessingExecutor = Executors.newCachedThreadPool(); + readResultsProcessingExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("alarm-service")); } @PreDestroy diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java index 675a24a7f7..d75014a389 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/CassandraAuditLogDao.java @@ -29,6 +29,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; import org.thingsboard.server.common.data.id.CustomerId; @@ -117,7 +118,7 @@ public class CassandraAuditLogDao extends CassandraAbstractSearchTimeDao implements TbSqlQueue { @Override public void init(ScheduledLogExecutorComponent logExecutor, Consumer> saveFunction) { this.logExecutor = logExecutor; - executor = Executors.newSingleThreadExecutor(); + executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("sql-queue-" + params.getLogName().toLowerCase())); executor.submit(() -> { String logName = params.getLogName(); int batchSize = params.getBatchSize(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java index a7efad1e88..15a012df66 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java @@ -44,44 +44,6 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx private static final String DESC_ORDER = "DESC"; - @Value("${sql.ts_inserts_executor_type}") - private String insertExecutorType; - - @Value("${sql.ts_inserts_fixed_thread_pool_size}") - private int insertFixedThreadPoolSize; - - @Value("${spring.datasource.hikari.maximumPoolSize}") - private int maximumPoolSize; - - protected ListeningExecutorService insertService; - - @PostConstruct - void init() { - Optional executorTypeOptional = TsInsertExecutorType.parse(insertExecutorType); - TsInsertExecutorType executorType; - executorType = executorTypeOptional.orElse(TsInsertExecutorType.FIXED); - switch (executorType) { - case SINGLE: - insertService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - break; - case FIXED: - case CACHED: - int poolSize = insertFixedThreadPoolSize; - if (poolSize <= 0) { - poolSize = maximumPoolSize * 4; - } - insertService = MoreExecutors.listeningDecorator(Executors.newWorkStealingPool(poolSize)); - break; - } - } - - @PreDestroy - void preDestroy() { - if (insertService != null) { - insertService.shutdown(); - } - } - protected ListenableFuture> processFindAllAsync(TenantId tenantId, EntityId entityId, List queries) { List>> futures = queries .stream() diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index 961f545567..cbd2d2cdc8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -170,12 +170,12 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements @Override public ListenableFuture savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) { - return insertService.submit(() -> null); + return Futures.immediateFuture(null); } @Override public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { - return insertService.submit(() -> null); + return Futures.immediateFuture(null); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/JpaTimeseriesDao.java index 7c198d9a73..71b156b81f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/JpaTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/JpaTimeseriesDao.java @@ -334,7 +334,7 @@ public class JpaTimeseriesDao extends AbstractSqlTimeseriesDao implements Timese @Override public ListenableFuture savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) { - return insertService.submit(() -> null); + return Futures.immediateFuture(null); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java index a553aa5f9c..54fdfbf2d5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.tools.TbRateLimits; import org.thingsboard.server.dao.nosql.CassandraStatementTask; @@ -67,9 +68,9 @@ public abstract class AbstractBufferedRateExecutor(queueLimit); - this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads); + this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads, ThingsBoardThreadFactory.forName("nosql-dispatcher")); this.callbackExecutor = Executors.newWorkStealingPool(callbackThreads); - this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); + this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nosql-timeout")); this.perTenantLimitsEnabled = perTenantLimitsEnabled; this.perTenantLimitsConfiguration = perTenantLimitsConfiguration; for (int i = 0; i < dispatcherThreads; i++) {