add timeoutExecutor to prevent deadlocks; fix not using timeout from configuration in TbSendEmailNode
This commit is contained in:
parent
8efb0dddab
commit
33b502e156
@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.context.MessageSource;
|
import org.springframework.context.MessageSource;
|
||||||
import org.springframework.context.annotation.Lazy;
|
import org.springframework.context.annotation.Lazy;
|
||||||
import org.springframework.core.NestedRuntimeException;
|
import org.springframework.core.NestedRuntimeException;
|
||||||
@ -34,6 +35,7 @@ import org.springframework.stereotype.Service;
|
|||||||
import org.springframework.ui.freemarker.FreeMarkerTemplateUtils;
|
import org.springframework.ui.freemarker.FreeMarkerTemplateUtils;
|
||||||
import org.thingsboard.rule.engine.api.MailService;
|
import org.thingsboard.rule.engine.api.MailService;
|
||||||
import org.thingsboard.rule.engine.api.TbEmail;
|
import org.thingsboard.rule.engine.api.TbEmail;
|
||||||
|
import org.thingsboard.rule.engine.mail.TbSendEmailNodeConfiguration;
|
||||||
import org.thingsboard.server.common.data.AdminSettings;
|
import org.thingsboard.server.common.data.AdminSettings;
|
||||||
import org.thingsboard.server.common.data.ApiFeature;
|
import org.thingsboard.server.common.data.ApiFeature;
|
||||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
||||||
@ -49,6 +51,7 @@ import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
|
|||||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
|
import javax.annotation.PreDestroy;
|
||||||
import javax.mail.MessagingException;
|
import javax.mail.MessagingException;
|
||||||
import javax.mail.internet.MimeMessage;
|
import javax.mail.internet.MimeMessage;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
@ -56,6 +59,8 @@ import java.util.HashMap;
|
|||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
@ -83,6 +88,11 @@ public class DefaultMailService implements MailService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private MailExecutorService mailExecutorService;
|
private MailExecutorService mailExecutorService;
|
||||||
|
|
||||||
|
@Value("${actors.rule.mail_timeout_thread_pool_size}")
|
||||||
|
private int timeoutExecutorPoolSize;
|
||||||
|
|
||||||
|
private ExecutorService timeoutExecutorService;
|
||||||
|
|
||||||
private JavaMailSenderImpl mailSender;
|
private JavaMailSenderImpl mailSender;
|
||||||
|
|
||||||
private String mailFrom;
|
private String mailFrom;
|
||||||
@ -99,6 +109,14 @@ public class DefaultMailService implements MailService {
|
|||||||
@PostConstruct
|
@PostConstruct
|
||||||
private void init() {
|
private void init() {
|
||||||
updateMailConfiguration();
|
updateMailConfiguration();
|
||||||
|
timeoutExecutorService = Executors.newFixedThreadPool(timeoutExecutorPoolSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
private void destroy() {
|
||||||
|
if (timeoutExecutorService != null) {
|
||||||
|
timeoutExecutorService.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -262,15 +280,15 @@ public class DefaultMailService implements MailService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void send(TenantId tenantId, CustomerId customerId, TbEmail tbEmail) throws ThingsboardException {
|
public void send(TenantId tenantId, CustomerId customerId, TbEmail tbEmail) throws ThingsboardException {
|
||||||
sendMail(tenantId, customerId, tbEmail, this.mailSender);
|
sendMail(tenantId, customerId, tbEmail, this.mailSender, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void send(TenantId tenantId, CustomerId customerId, TbEmail tbEmail, JavaMailSender javaMailSender) throws ThingsboardException {
|
public void send(TenantId tenantId, CustomerId customerId, TbEmail tbEmail, JavaMailSender javaMailSender, long timeout) throws ThingsboardException {
|
||||||
sendMail(tenantId, customerId, tbEmail, javaMailSender);
|
sendMail(tenantId, customerId, tbEmail, javaMailSender, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendMail(TenantId tenantId, CustomerId customerId, TbEmail tbEmail, JavaMailSender javaMailSender) throws ThingsboardException {
|
private void sendMail(TenantId tenantId, CustomerId customerId, TbEmail tbEmail, JavaMailSender javaMailSender, long timeout) throws ThingsboardException {
|
||||||
if (apiUsageStateService.getApiUsageState(tenantId).isEmailSendEnabled()) {
|
if (apiUsageStateService.getApiUsageState(tenantId).isEmailSendEnabled()) {
|
||||||
try {
|
try {
|
||||||
MimeMessage mailMsg = javaMailSender.createMimeMessage();
|
MimeMessage mailMsg = javaMailSender.createMimeMessage();
|
||||||
@ -474,10 +492,11 @@ public class DefaultMailService implements MailService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void sendMailWithTimeout(JavaMailSender mailSender, MimeMessage msg, long timeout) {
|
private void sendMailWithTimeout(JavaMailSender mailSender, MimeMessage msg, long timeout) {
|
||||||
var submittedMail = Futures.submit(() -> mailSender.send(msg), mailExecutorService);
|
var submittedMail = timeoutExecutorService.submit(() -> mailSender.send(msg));
|
||||||
try {
|
try {
|
||||||
submittedMail.get(timeout, TimeUnit.MILLISECONDS);
|
submittedMail.get(timeout, TimeUnit.MILLISECONDS);
|
||||||
} catch (TimeoutException ignored) {
|
} catch (TimeoutException e) {
|
||||||
|
log.debug("Error during mail submission", e);
|
||||||
throw new RuntimeException("Timeout!");
|
throw new RuntimeException("Timeout!");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(ExceptionUtils.getRootCause(e));
|
throw new RuntimeException(ExceptionUtils.getRootCause(e));
|
||||||
|
|||||||
@ -326,7 +326,9 @@ actors:
|
|||||||
# Specify thread pool size for javascript executor service
|
# Specify thread pool size for javascript executor service
|
||||||
js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:50}"
|
js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:50}"
|
||||||
# Specify thread pool size for mail sender executor service
|
# Specify thread pool size for mail sender executor service
|
||||||
mail_thread_pool_size: "${ACTORS_RULE_MAIL_THREAD_POOL_SIZE:50}"
|
mail_thread_pool_size: "${ACTORS_RULE_MAIL_THREAD_POOL_SIZE:40}"
|
||||||
|
# Specify thread pool size for mail sender executor service
|
||||||
|
mail_timeout_thread_pool_size: "${ACTORS_RULE_MAIL_TIMEOUT_THREAD_POOL_SIZE:10}"
|
||||||
# Specify thread pool size for sms sender executor service
|
# Specify thread pool size for sms sender executor service
|
||||||
sms_thread_pool_size: "${ACTORS_RULE_SMS_THREAD_POOL_SIZE:50}"
|
sms_thread_pool_size: "${ACTORS_RULE_SMS_THREAD_POOL_SIZE:50}"
|
||||||
# Whether to allow usage of system mail service for rules
|
# Whether to allow usage of system mail service for rules
|
||||||
|
|||||||
@ -47,8 +47,7 @@ public interface MailService {
|
|||||||
void sendTwoFaVerificationEmail(String email, String verificationCode, int expirationTimeSeconds) throws ThingsboardException;
|
void sendTwoFaVerificationEmail(String email, String verificationCode, int expirationTimeSeconds) throws ThingsboardException;
|
||||||
|
|
||||||
void send(TenantId tenantId, CustomerId customerId, TbEmail tbEmail) throws ThingsboardException;
|
void send(TenantId tenantId, CustomerId customerId, TbEmail tbEmail) throws ThingsboardException;
|
||||||
|
void send(TenantId tenantId, CustomerId customerId, TbEmail tbEmail, JavaMailSender javaMailSender, long timeout) throws ThingsboardException;
|
||||||
void send(TenantId tenantId, CustomerId customerId, TbEmail tbEmail, JavaMailSender javaMailSender) throws ThingsboardException;
|
|
||||||
|
|
||||||
void sendApiFeatureStateEmail(ApiFeature apiFeature, ApiUsageStateValue stateValue, String email, ApiUsageStateMailMessage msg) throws ThingsboardException;
|
void sendApiFeatureStateEmail(ApiFeature apiFeature, ApiUsageStateValue stateValue, String email, ApiUsageStateMailMessage msg) throws ThingsboardException;
|
||||||
|
|
||||||
|
|||||||
@ -88,7 +88,7 @@ public class TbSendEmailNode implements TbNode {
|
|||||||
if (this.config.isUseSystemSmtpSettings()) {
|
if (this.config.isUseSystemSmtpSettings()) {
|
||||||
ctx.getMailService(true).send(ctx.getTenantId(), msg.getCustomerId(), email);
|
ctx.getMailService(true).send(ctx.getTenantId(), msg.getCustomerId(), email);
|
||||||
} else {
|
} else {
|
||||||
ctx.getMailService(false).send(ctx.getTenantId(), msg.getCustomerId(), email, this.mailSender);
|
ctx.getMailService(false).send(ctx.getTenantId(), msg.getCustomerId(), email, this.mailSender, config.getTimeout());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user