Mail Send Timeouts

This commit is contained in:
Andrii Shvaika 2022-07-08 17:35:02 +03:00
parent a2c84e551b
commit 0c5eb6bb3c
5 changed files with 49 additions and 17 deletions

View File

@ -85,10 +85,8 @@ public class DefaultMailService implements MailService {
@Autowired
private MailExecutorService mailExecutorService;
@Value("${actors.rule.mail_timeout_thread_pool_size}")
private int timeoutExecutorPoolSize;
private ExecutorService timeoutExecutorService;
@Autowired
private PasswordResetExecutorService passwordResetExecutorService;
private JavaMailSenderImpl mailSender;
@ -106,14 +104,6 @@ public class DefaultMailService implements MailService {
@PostConstruct
private void init() {
updateMailConfiguration();
timeoutExecutorService = Executors.newFixedThreadPool(timeoutExecutorPoolSize);
}
@PreDestroy
private void destroy() {
if (timeoutExecutorService != null) {
timeoutExecutorService.shutdown();
}
}
@Override
@ -252,7 +242,7 @@ public class DefaultMailService implements MailService {
@Override
public void sendResetPasswordEmailAsync(String passwordResetLink, String email) {
mailExecutorService.execute(() -> {
passwordResetExecutorService.execute(() -> {
try {
this.sendResetPasswordEmail(passwordResetLink, email);
} catch (ThingsboardException e) {
@ -489,9 +479,8 @@ public class DefaultMailService implements MailService {
}
private void sendMailWithTimeout(JavaMailSender mailSender, MimeMessage msg, long timeout) {
var submittedMail = timeoutExecutorService.submit(() -> mailSender.send(msg));
try {
submittedMail.get(timeout, TimeUnit.MILLISECONDS);
mailExecutorService.submit(() -> mailSender.send(msg)).get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
log.debug("Error during mail submission", e);
throw new RuntimeException("Timeout!");

View File

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.mail;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.AbstractListeningExecutor;
@Component
public class PasswordResetExecutorService extends AbstractListeningExecutor {
@Value("${actors.rule.mail_password_reset_thread_pool_size:10}")
private int mailExecutorThreadPoolSize;
@Override
protected int getThreadPollSize() {
return mailExecutorThreadPoolSize;
}
}

View File

@ -327,8 +327,8 @@ actors:
js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:50}"
# Specify thread pool size for mail sender executor service
mail_thread_pool_size: "${ACTORS_RULE_MAIL_THREAD_POOL_SIZE:40}"
# Specify thread pool size for mail sender executor service
mail_timeout_thread_pool_size: "${ACTORS_RULE_MAIL_TIMEOUT_THREAD_POOL_SIZE:10}"
# Specify thread pool size for password reset emails
mail_password_reset_thread_pool_size: "${ACTORS_RULE_MAIL_PASSWORD_RESET_THREAD_POOL_SIZE:10}"
# Specify thread pool size for sms sender executor service
sms_thread_pool_size: "${ACTORS_RULE_SMS_THREAD_POOL_SIZE:50}"
# Whether to allow usage of system mail service for rules

View File

@ -49,6 +49,10 @@ public abstract class AbstractListeningExecutor implements ListeningExecutor {
return service.submit(task);
}
public ListenableFuture<?> executeAsync(Runnable task) {
return service.submit(task);
}
@Override
public void execute(Runnable command) {
service.execute(command);

View File

@ -24,8 +24,14 @@ public interface ListeningExecutor extends Executor {
<T> ListenableFuture<T> executeAsync(Callable<T> task);
ListenableFuture<?> executeAsync(Runnable task);
default <T> ListenableFuture<T> submit(Callable<T> task) {
return executeAsync(task);
}
default ListenableFuture<?> submit(Runnable task) {
return executeAsync(task);
}
}