diff --git a/application/src/main/java/org/thingsboard/server/service/executors/PubSubRuleNodeExecutorProvider.java b/application/src/main/java/org/thingsboard/server/service/executors/PubSubRuleNodeExecutorProvider.java index 7db74f6195..f730949f45 100644 --- a/application/src/main/java/org/thingsboard/server/service/executors/PubSubRuleNodeExecutorProvider.java +++ b/application/src/main/java/org/thingsboard/server/service/executors/PubSubRuleNodeExecutorProvider.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.service.executors; import org.springframework.stereotype.Component; @@ -5,6 +20,7 @@ import org.thingsboard.common.util.ExecutorProvider; import org.thingsboard.common.util.ThingsBoardThreadFactory; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -26,4 +42,11 @@ public class PubSubRuleNodeExecutorProvider implements ExecutorProvider { public ScheduledExecutorService getExecutor() { return executor; } + + @PreDestroy + private void destroy() { + if (executor != null) { + executor.shutdown(); + } + } } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index ccc4953793..802756c105 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1412,7 +1412,7 @@ queue: # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" # Number of threads of pubsub executor provider - executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:}" + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubQueueExecutorProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubQueueExecutorProvider.java index 2b636b5ee2..4de75faa56 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubQueueExecutorProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubQueueExecutorProvider.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.queue.pubsub; import org.springframework.beans.factory.annotation.Value; @@ -7,6 +22,7 @@ import org.thingsboard.common.util.ExecutorProvider; import org.thingsboard.common.util.ThingsBoardThreadFactory; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -34,4 +50,11 @@ public class TbPubSubQueueExecutorProvider implements ExecutorProvider { public ScheduledExecutorService getExecutor() { return executor; } + + @PreDestroy + private void destroy() { + if (executor != null) { + executor.shutdown(); + } + } } diff --git a/common/util/src/main/java/org/thingsboard/common/util/ExecutorProvider.java b/common/util/src/main/java/org/thingsboard/common/util/ExecutorProvider.java index 4abc46fb16..61379c8c89 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/ExecutorProvider.java +++ b/common/util/src/main/java/org/thingsboard/common/util/ExecutorProvider.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.common.util; import java.util.concurrent.ScheduledExecutorService; diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 2353199c2b..9fc9b884f0 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -173,7 +173,7 @@ queue: # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" # Number of threads of pubsub executor provider - executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:}" + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:}" queue-properties: # Pub/Sub properties for Core subscribers, messages which will commit after ackDeadlineInSec period can be consumed again core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 5588384efd..27e5c8b273 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -296,7 +296,7 @@ queue: # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" # Number of threads of pubsub executor provider - executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:}" + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 2ca2a04d1f..b8070f6575 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -279,7 +279,7 @@ queue: # Number of messages per a consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" # Number of threads of pubsub executor provider - executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:}" + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consume again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index d8ee8d14c4..b1d40c720f 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -375,7 +375,7 @@ queue: # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" # Number of threads of pubsub executor provider - executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:}" + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 6dc1213884..4d130eecd9 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -312,7 +312,7 @@ queue: # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" # Number of threads of pubsub executor provider - executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:}" + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 928c3c0642..2b89065a55 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -265,7 +265,7 @@ queue: # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" # Number of threads of pubsub executor provider - executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_SIZE:}" + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"