Notification system - less async operations
This commit is contained in:
		
							parent
							
								
									67656a2757
								
							
						
					
					
						commit
						f5cd8a9a52
					
				@ -61,7 +61,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceM
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.notification.NotificationRuleProcessor;
 | 
			
		||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.BaseApiUsageState.StatsCalculationResult;
 | 
			
		||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.mail.MailExecutorService;
 | 
			
		||||
 | 
			
		||||
@ -15,13 +15,10 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.notification;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.common.util.DonAsynchron;
 | 
			
		||||
import org.thingsboard.rule.engine.api.NotificationCenter;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.User;
 | 
			
		||||
@ -59,14 +56,12 @@ import org.thingsboard.server.dao.notification.NotificationService;
 | 
			
		||||
import org.thingsboard.server.dao.notification.NotificationSettingsService;
 | 
			
		||||
import org.thingsboard.server.dao.notification.NotificationTargetService;
 | 
			
		||||
import org.thingsboard.server.dao.notification.NotificationTemplateService;
 | 
			
		||||
import org.thingsboard.server.dao.user.UserService;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.NotificationsTopicService;
 | 
			
		||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.executors.NotificationExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.notification.channels.NotificationChannel;
 | 
			
		||||
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
 | 
			
		||||
@ -74,7 +69,6 @@ import org.thingsboard.server.service.telemetry.AbstractSubscriptionService;
 | 
			
		||||
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate;
 | 
			
		||||
import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
@ -87,7 +81,7 @@ import java.util.stream.Collectors;
 | 
			
		||||
@Service
 | 
			
		||||
@Slf4j
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@SuppressWarnings({"UnstableApiUsage", "rawtypes"})
 | 
			
		||||
@SuppressWarnings({"rawtypes"})
 | 
			
		||||
public class DefaultNotificationCenter extends AbstractSubscriptionService implements NotificationCenter, NotificationChannel<User, WebDeliveryMethodNotificationTemplate> {
 | 
			
		||||
 | 
			
		||||
    private final NotificationTargetService notificationTargetService;
 | 
			
		||||
@ -95,9 +89,7 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple
 | 
			
		||||
    private final NotificationService notificationService;
 | 
			
		||||
    private final NotificationTemplateService notificationTemplateService;
 | 
			
		||||
    private final NotificationSettingsService notificationSettingsService;
 | 
			
		||||
    private final UserService userService;
 | 
			
		||||
    private final NotificationExecutorService notificationExecutor;
 | 
			
		||||
    private final DbCallbackExecutorService dbCallbackExecutorService;
 | 
			
		||||
    private final NotificationsTopicService notificationsTopicService;
 | 
			
		||||
    private final TbQueueProducerProvider producerProvider;
 | 
			
		||||
    private final RateLimitService rateLimitService;
 | 
			
		||||
@ -172,37 +164,32 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        notificationExecutor.submit(() -> {
 | 
			
		||||
            List<ListenableFuture<Void>> results = new ArrayList<>();
 | 
			
		||||
 | 
			
		||||
            for (NotificationTarget target : targets) {
 | 
			
		||||
                List<ListenableFuture<Void>> result = processForTarget(target, ctx);
 | 
			
		||||
                results.addAll(result);
 | 
			
		||||
                processForTarget(target, ctx);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            Futures.whenAllComplete(results).run(() -> {
 | 
			
		||||
                NotificationRequestId requestId = ctx.getRequest().getId();
 | 
			
		||||
                log.debug("[{}] Notification request processing is finished", requestId);
 | 
			
		||||
                NotificationRequestStats stats = ctx.getStats();
 | 
			
		||||
                try {
 | 
			
		||||
                    notificationRequestService.updateNotificationRequest(tenantId, requestId, NotificationRequestStatus.SENT, stats);
 | 
			
		||||
                } catch (Exception e) {
 | 
			
		||||
                    log.error("[{}] Failed to update stats for notification request", requestId, e);
 | 
			
		||||
                }
 | 
			
		||||
            NotificationRequestId requestId = ctx.getRequest().getId();
 | 
			
		||||
            log.debug("[{}] Notification request processing is finished", requestId);
 | 
			
		||||
            NotificationRequestStats stats = ctx.getStats();
 | 
			
		||||
            try {
 | 
			
		||||
                notificationRequestService.updateNotificationRequest(tenantId, requestId, NotificationRequestStatus.SENT, stats);
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                log.error("[{}] Failed to update stats for notification request", requestId, e);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
                if (callback != null) {
 | 
			
		||||
                    try {
 | 
			
		||||
                        callback.accept(stats);
 | 
			
		||||
                    } catch (Exception e) {
 | 
			
		||||
                        log.error("Failed to process callback for notification request {}", requestId, e);
 | 
			
		||||
                    }
 | 
			
		||||
            if (callback != null) {
 | 
			
		||||
                try {
 | 
			
		||||
                    callback.accept(stats);
 | 
			
		||||
                } catch (Exception e) {
 | 
			
		||||
                    log.error("Failed to process callback for notification request {}", requestId, e);
 | 
			
		||||
                }
 | 
			
		||||
            }, dbCallbackExecutorService);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        return request;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private List<ListenableFuture<Void>> processForTarget(NotificationTarget target, NotificationProcessingContext ctx) {
 | 
			
		||||
    private void processForTarget(NotificationTarget target, NotificationProcessingContext ctx) {
 | 
			
		||||
        Iterable<? extends NotificationRecipient> recipients;
 | 
			
		||||
        switch (target.getConfiguration().getType()) {
 | 
			
		||||
            case PLATFORM_USERS: {
 | 
			
		||||
@ -231,43 +218,35 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple
 | 
			
		||||
        Set<NotificationDeliveryMethod> deliveryMethods = new HashSet<>(ctx.getDeliveryMethods());
 | 
			
		||||
        deliveryMethods.removeIf(deliveryMethod -> !target.getConfiguration().getType().getSupportedDeliveryMethods().contains(deliveryMethod));
 | 
			
		||||
        log.debug("[{}] Processing notification request for {} target ({}) for delivery methods {}", ctx.getRequest().getId(), target.getConfiguration().getType(), target.getId(), deliveryMethods);
 | 
			
		||||
        if (deliveryMethods.isEmpty()) {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        List<ListenableFuture<Void>> results = new ArrayList<>();
 | 
			
		||||
        if (!deliveryMethods.isEmpty()) {
 | 
			
		||||
            for (NotificationRecipient recipient : recipients) {
 | 
			
		||||
                for (NotificationDeliveryMethod deliveryMethod : deliveryMethods) {
 | 
			
		||||
                    ListenableFuture<Void> resultFuture = processForRecipient(deliveryMethod, recipient, ctx);
 | 
			
		||||
                    DonAsynchron.withCallback(resultFuture, result -> {
 | 
			
		||||
                        ctx.getStats().reportSent(deliveryMethod, recipient);
 | 
			
		||||
                    }, error -> {
 | 
			
		||||
                        ctx.getStats().reportError(deliveryMethod, error, recipient);
 | 
			
		||||
                    });
 | 
			
		||||
                    results.add(resultFuture);
 | 
			
		||||
        for (NotificationRecipient recipient : recipients) {
 | 
			
		||||
            for (NotificationDeliveryMethod deliveryMethod : deliveryMethods) {
 | 
			
		||||
                try {
 | 
			
		||||
                    processForRecipient(deliveryMethod, recipient, ctx);
 | 
			
		||||
                    ctx.getStats().reportSent(deliveryMethod, recipient);
 | 
			
		||||
                } catch (Exception error) {
 | 
			
		||||
                    ctx.getStats().reportError(deliveryMethod, error, recipient);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return results;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Void> processForRecipient(NotificationDeliveryMethod deliveryMethod, NotificationRecipient recipient, NotificationProcessingContext ctx) {
 | 
			
		||||
    private void processForRecipient(NotificationDeliveryMethod deliveryMethod, NotificationRecipient recipient, NotificationProcessingContext ctx) throws Exception {
 | 
			
		||||
        if (ctx.getStats().contains(deliveryMethod, recipient.getId())) {
 | 
			
		||||
            return Futures.immediateFailedFuture(new AlreadySentException());
 | 
			
		||||
            throw new AlreadySentException();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        DeliveryMethodNotificationTemplate processedTemplate;
 | 
			
		||||
        try {
 | 
			
		||||
            processedTemplate = ctx.getProcessedTemplate(deliveryMethod, recipient);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            return Futures.immediateFailedFuture(e);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        NotificationChannel notificationChannel = channels.get(deliveryMethod);
 | 
			
		||||
        DeliveryMethodNotificationTemplate processedTemplate = ctx.getProcessedTemplate(deliveryMethod, recipient);
 | 
			
		||||
 | 
			
		||||
        log.trace("[{}] Sending {} notification for recipient {}", ctx.getRequest().getId(), deliveryMethod, recipient);
 | 
			
		||||
        return notificationChannel.sendNotification(recipient, processedTemplate, ctx);
 | 
			
		||||
        notificationChannel.sendNotification(recipient, processedTemplate, ctx);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> sendNotification(User recipient, WebDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) {
 | 
			
		||||
    public void sendNotification(User recipient, WebDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) throws Exception {
 | 
			
		||||
        NotificationRequest request = ctx.getRequest();
 | 
			
		||||
        Notification notification = Notification.builder()
 | 
			
		||||
                .requestId(request.getId())
 | 
			
		||||
@ -283,14 +262,14 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple
 | 
			
		||||
            notification = notificationService.saveNotification(recipient.getTenantId(), notification);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("Failed to create notification for recipient {}", recipient.getId(), e);
 | 
			
		||||
            return Futures.immediateFailedFuture(e);
 | 
			
		||||
            throw e;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        NotificationUpdate update = NotificationUpdate.builder()
 | 
			
		||||
                .created(true)
 | 
			
		||||
                .notification(notification)
 | 
			
		||||
                .build();
 | 
			
		||||
        return onNotificationUpdate(recipient.getTenantId(), recipient.getId(), update);
 | 
			
		||||
        onNotificationUpdate(recipient.getTenantId(), recipient.getId(), update);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -384,13 +363,11 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple
 | 
			
		||||
        clusterService.pushMsgToCore(tenantId, notificationRequestId, toCoreMsg, null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Void> onNotificationUpdate(TenantId tenantId, UserId recipientId, NotificationUpdate update) {
 | 
			
		||||
    private void onNotificationUpdate(TenantId tenantId, UserId recipientId, NotificationUpdate update) {
 | 
			
		||||
        log.trace("Submitting notification update for recipient {}: {}", recipientId, update);
 | 
			
		||||
        return Futures.submit(() -> {
 | 
			
		||||
            forwardToSubscriptionManagerService(tenantId, recipientId, subscriptionManagerService -> {
 | 
			
		||||
                subscriptionManagerService.onNotificationUpdate(tenantId, recipientId, update, TbCallback.EMPTY);
 | 
			
		||||
            }, () -> TbSubscriptionUtils.notificationUpdateToProto(tenantId, recipientId, update));
 | 
			
		||||
        }, wsCallBackExecutor);
 | 
			
		||||
        forwardToSubscriptionManagerService(tenantId, recipientId, subscriptionManagerService -> {
 | 
			
		||||
            subscriptionManagerService.onNotificationUpdate(tenantId, recipientId, update, TbCallback.EMPTY);
 | 
			
		||||
        }, () -> TbSubscriptionUtils.notificationUpdateToProto(tenantId, recipientId, update));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void onNotificationRequestUpdate(TenantId tenantId, NotificationRequestUpdate update) {
 | 
			
		||||
 | 
			
		||||
@ -15,7 +15,6 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.notification.channels;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.rule.engine.api.MailService;
 | 
			
		||||
@ -24,7 +23,6 @@ import org.thingsboard.server.common.data.User;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationDeliveryMethod;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.template.EmailDeliveryMethodNotificationTemplate;
 | 
			
		||||
import org.thingsboard.server.service.mail.MailExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.notification.NotificationProcessingContext;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@ -32,19 +30,15 @@ import org.thingsboard.server.service.notification.NotificationProcessingContext
 | 
			
		||||
public class EmailNotificationChannel implements NotificationChannel<User, EmailDeliveryMethodNotificationTemplate> {
 | 
			
		||||
 | 
			
		||||
    private final MailService mailService;
 | 
			
		||||
    private final MailExecutorService executor;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> sendNotification(User recipient, EmailDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) {
 | 
			
		||||
        return executor.submit(() -> {
 | 
			
		||||
            mailService.send(recipient.getTenantId(), null, TbEmail.builder()
 | 
			
		||||
                    .to(recipient.getEmail())
 | 
			
		||||
                    .subject(processedTemplate.getSubject())
 | 
			
		||||
                    .body(processedTemplate.getBody())
 | 
			
		||||
                    .html(true)
 | 
			
		||||
                    .build());
 | 
			
		||||
            return null;
 | 
			
		||||
        });
 | 
			
		||||
    public void sendNotification(User recipient, EmailDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) throws Exception {
 | 
			
		||||
        mailService.send(recipient.getTenantId(), null, TbEmail.builder()
 | 
			
		||||
                .to(recipient.getEmail())
 | 
			
		||||
                .subject(processedTemplate.getSubject())
 | 
			
		||||
                .body(processedTemplate.getBody())
 | 
			
		||||
                .html(true)
 | 
			
		||||
                .build());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -15,7 +15,6 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.notification.channels;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationDeliveryMethod;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.targets.NotificationRecipient;
 | 
			
		||||
@ -24,7 +23,7 @@ import org.thingsboard.server.service.notification.NotificationProcessingContext
 | 
			
		||||
 | 
			
		||||
public interface NotificationChannel<R extends NotificationRecipient, T extends DeliveryMethodNotificationTemplate> {
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<Void> sendNotification(R recipient, T processedTemplate, NotificationProcessingContext ctx);
 | 
			
		||||
    void sendNotification(R recipient, T processedTemplate, NotificationProcessingContext ctx) throws Exception;
 | 
			
		||||
 | 
			
		||||
    void check(TenantId tenantId) throws Exception;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -15,7 +15,6 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.notification.channels;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.rule.engine.api.slack.SlackService;
 | 
			
		||||
@ -26,7 +25,6 @@ import org.thingsboard.server.common.data.notification.settings.SlackNotificatio
 | 
			
		||||
import org.thingsboard.server.common.data.notification.targets.slack.SlackConversation;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.template.SlackDeliveryMethodNotificationTemplate;
 | 
			
		||||
import org.thingsboard.server.dao.notification.NotificationSettingsService;
 | 
			
		||||
import org.thingsboard.server.service.executors.ExternalCallExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.notification.NotificationProcessingContext;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@ -35,15 +33,11 @@ public class SlackNotificationChannel implements NotificationChannel<SlackConver
 | 
			
		||||
 | 
			
		||||
    private final SlackService slackService;
 | 
			
		||||
    private final NotificationSettingsService notificationSettingsService;
 | 
			
		||||
    private final ExternalCallExecutorService executor;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> sendNotification(SlackConversation conversation, SlackDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) {
 | 
			
		||||
    public void sendNotification(SlackConversation conversation, SlackDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) throws Exception {
 | 
			
		||||
        SlackNotificationDeliveryMethodConfig config = ctx.getDeliveryMethodConfig(NotificationDeliveryMethod.SLACK);
 | 
			
		||||
        return executor.submit(() -> {
 | 
			
		||||
            slackService.sendMessage(ctx.getTenantId(), config.getBotToken(), conversation.getId(), processedTemplate.getBody());
 | 
			
		||||
            return null;
 | 
			
		||||
        });
 | 
			
		||||
        slackService.sendMessage(ctx.getTenantId(), config.getBotToken(), conversation.getId(), processedTemplate.getBody());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -15,8 +15,6 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.notification.channels;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.apache.commons.lang3.StringUtils;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
@ -26,26 +24,21 @@ import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationDeliveryMethod;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.template.SmsDeliveryMethodNotificationTemplate;
 | 
			
		||||
import org.thingsboard.server.service.notification.NotificationProcessingContext;
 | 
			
		||||
import org.thingsboard.server.service.sms.SmsExecutorService;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class SmsNotificationChannel implements NotificationChannel<User, SmsDeliveryMethodNotificationTemplate> {
 | 
			
		||||
 | 
			
		||||
    private final SmsService smsService;
 | 
			
		||||
    private final SmsExecutorService executor;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> sendNotification(User recipient, SmsDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) {
 | 
			
		||||
    public void sendNotification(User recipient, SmsDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) throws Exception {
 | 
			
		||||
        String phone = recipient.getPhone();
 | 
			
		||||
        if (StringUtils.isBlank(phone)) {
 | 
			
		||||
            return Futures.immediateFailedFuture(new RuntimeException("User does not have phone number"));
 | 
			
		||||
            throw new RuntimeException("User does not have phone number");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return executor.submit(() -> {
 | 
			
		||||
            smsService.sendSms(recipient.getTenantId(), recipient.getCustomerId(), new String[]{phone}, processedTemplate.getBody());
 | 
			
		||||
            return null;
 | 
			
		||||
        });
 | 
			
		||||
        smsService.sendSms(recipient.getTenantId(), recipient.getCustomerId(), new String[]{phone}, processedTemplate.getBody());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -15,10 +15,11 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.notification.rule;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.Setter;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.boot.context.properties.ConfigurationProperties;
 | 
			
		||||
import org.springframework.cache.Cache;
 | 
			
		||||
import org.springframework.cache.CacheManager;
 | 
			
		||||
import org.springframework.context.annotation.Lazy;
 | 
			
		||||
@ -38,29 +39,27 @@ import org.thingsboard.server.common.data.notification.info.NotificationInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.rule.NotificationRule;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerConfig;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.settings.TriggerTypeConfig;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
 | 
			
		||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
 | 
			
		||||
import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger;
 | 
			
		||||
import org.thingsboard.server.common.msg.notification.trigger.RuleEngineMsgTrigger;
 | 
			
		||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.dao.notification.NotificationRequestService;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.LimitedApi;
 | 
			
		||||
import org.thingsboard.server.dao.util.limits.RateLimitService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.notification.NotificationRuleProcessor;
 | 
			
		||||
import org.thingsboard.server.service.executors.NotificationExecutorService;
 | 
			
		||||
import org.thingsboard.server.service.notification.rule.cache.NotificationRulesCache;
 | 
			
		||||
import org.thingsboard.server.service.notification.rule.trigger.NotificationRuleTriggerProcessor;
 | 
			
		||||
import org.thingsboard.server.service.notification.rule.trigger.RuleEngineMsgNotificationRuleTriggerProcessor;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PostConstruct;
 | 
			
		||||
import java.io.Serializable;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collection;
 | 
			
		||||
import java.util.EnumMap;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@ -96,20 +95,27 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess
 | 
			
		||||
    @Override
 | 
			
		||||
    public void process(NotificationRuleTrigger trigger) {
 | 
			
		||||
        NotificationRuleTriggerType triggerType = trigger.getType();
 | 
			
		||||
        if (triggerType == null) return;
 | 
			
		||||
        TenantId tenantId = triggerType.isTenantLevel() ? trigger.getTenantId() : TenantId.SYS_TENANT_ID;
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            List<NotificationRule> rules = notificationRulesCache.getEnabled(tenantId, triggerType);
 | 
			
		||||
            for (NotificationRule rule : rules) {
 | 
			
		||||
                notificationExecutor.submit(() -> {
 | 
			
		||||
            List<NotificationRule> enabledRules = notificationRulesCache.getEnabled(tenantId, triggerType);
 | 
			
		||||
            if (enabledRules.isEmpty()) {
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
            if (trigger.deduplicate()) {
 | 
			
		||||
                enabledRules = new ArrayList<>(enabledRules);
 | 
			
		||||
                enabledRules.removeIf(rule -> alreadySent(rule, trigger));
 | 
			
		||||
            }
 | 
			
		||||
            final List<NotificationRule> rules = enabledRules;
 | 
			
		||||
            notificationExecutor.submit(() -> {
 | 
			
		||||
                for (NotificationRule rule : rules) {
 | 
			
		||||
                    try {
 | 
			
		||||
                        processNotificationRule(rule, trigger);
 | 
			
		||||
                    } catch (Throwable e) {
 | 
			
		||||
                        log.error("Failed to process notification rule {} for trigger type {} with trigger object {}", rule.getId(), rule.getTriggerType(), trigger, e);
 | 
			
		||||
                    }
 | 
			
		||||
                });
 | 
			
		||||
            }
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
        } catch (Throwable e) {
 | 
			
		||||
            log.error("Failed to process notification rules for trigger: {}", trigger, e);
 | 
			
		||||
        }
 | 
			
		||||
@ -172,14 +178,13 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess
 | 
			
		||||
                .ruleId(rule.getId())
 | 
			
		||||
                .originatorEntityId(originatorEntityId)
 | 
			
		||||
                .build();
 | 
			
		||||
        notificationExecutor.submit(() -> {
 | 
			
		||||
            try {
 | 
			
		||||
                log.debug("Submitting notification request for rule '{}' with delay of {} sec to targets {}", rule.getName(), delayInSec, targets);
 | 
			
		||||
                notificationCenter.processNotificationRequest(rule.getTenantId(), notificationRequest, null);
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                log.error("Failed to process notification request for tenant {} for rule {}", rule.getTenantId(), rule.getId(), e);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            log.debug("Submitting notification request for rule '{}' with delay of {} sec to targets {}", rule.getName(), delayInSec, targets);
 | 
			
		||||
            notificationCenter.processNotificationRequest(rule.getTenantId(), notificationRequest, null);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("Failed to process notification request for tenant {} for rule {}", rule.getTenantId(), rule.getId(), e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean matchesFilter(NotificationRuleTrigger trigger, NotificationRuleTriggerConfig triggerConfig) {
 | 
			
		||||
@ -243,24 +248,9 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    public void setTriggerProcessors(Collection<NotificationRuleTriggerProcessor> processors) {
 | 
			
		||||
        Map<String, NotificationRuleTriggerType> ruleEngineMsgTypeToTriggerType = new HashMap<>();
 | 
			
		||||
        processors.forEach(processor -> {
 | 
			
		||||
            triggerProcessors.put(processor.getTriggerType(), processor);
 | 
			
		||||
            if (processor instanceof RuleEngineMsgNotificationRuleTriggerProcessor) {
 | 
			
		||||
                Set<String> supportedMsgTypes = ((RuleEngineMsgNotificationRuleTriggerProcessor<?>) processor).getSupportedMsgTypes();
 | 
			
		||||
                supportedMsgTypes.forEach(supportedMsgType -> {
 | 
			
		||||
                    ruleEngineMsgTypeToTriggerType.put(supportedMsgType, processor.getTriggerType());
 | 
			
		||||
                });
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
        RuleEngineMsgTrigger.msgTypeToTriggerType = ruleEngineMsgTypeToTriggerType;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Data
 | 
			
		||||
    private static class SentNotification implements Serializable {
 | 
			
		||||
        private static final long serialVersionUID = 38973480405095422L;
 | 
			
		||||
 | 
			
		||||
        private final NotificationRuleTrigger trigger;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -17,7 +17,6 @@ package org.thingsboard.server.service.notification.rule.cache;
 | 
			
		||||
 | 
			
		||||
import com.github.benmanes.caffeine.cache.Cache;
 | 
			
		||||
import com.github.benmanes.caffeine.cache.Caffeine;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
@ -50,7 +49,7 @@ public class DefaultNotificationRulesCache implements NotificationRulesCache {
 | 
			
		||||
    private int cacheMaxSize;
 | 
			
		||||
    @Value("${cache.notificationRules.timeToLiveInMinutes:30}")
 | 
			
		||||
    private int cacheValueTtl;
 | 
			
		||||
    private Cache<CacheKey, List<NotificationRule>> cache;
 | 
			
		||||
    private Cache<String, List<NotificationRule>> cache;
 | 
			
		||||
 | 
			
		||||
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
 | 
			
		||||
 | 
			
		||||
@ -96,21 +95,15 @@ public class DefaultNotificationRulesCache implements NotificationRulesCache {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void evict(TenantId tenantId) {
 | 
			
		||||
    public void evict(TenantId tenantId) {
 | 
			
		||||
        cache.invalidateAll(Arrays.stream(NotificationRuleTriggerType.values())
 | 
			
		||||
                .map(triggerType -> key(tenantId, triggerType))
 | 
			
		||||
                .collect(Collectors.toList()));
 | 
			
		||||
        log.trace("Evicted all notification rules for tenant {} from cache", tenantId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static CacheKey key(TenantId tenantId, NotificationRuleTriggerType triggerType) {
 | 
			
		||||
        return new CacheKey(tenantId, triggerType);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Data
 | 
			
		||||
    private static class CacheKey {
 | 
			
		||||
        private final TenantId tenantId;
 | 
			
		||||
        private final NotificationRuleTriggerType triggerType;
 | 
			
		||||
    private static String key(TenantId tenantId, NotificationRuleTriggerType triggerType) {
 | 
			
		||||
        return tenantId + "_" + triggerType;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -63,7 +63,7 @@ import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
 | 
			
		||||
import org.thingsboard.server.queue.notification.NotificationRuleProcessor;
 | 
			
		||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
 | 
			
		||||
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
 | 
			
		||||
import org.thingsboard.server.queue.util.AfterStartUp;
 | 
			
		||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
 | 
			
		||||
 | 
			
		||||
@ -52,7 +52,7 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
 | 
			
		||||
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
 | 
			
		||||
import org.thingsboard.server.dao.alarm.AlarmOperationResult;
 | 
			
		||||
import org.thingsboard.server.dao.alarm.AlarmService;
 | 
			
		||||
import org.thingsboard.server.queue.notification.NotificationRuleProcessor;
 | 
			
		||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
 | 
			
		||||
import org.thingsboard.server.service.entitiy.alarm.TbAlarmCommentService;
 | 
			
		||||
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
 | 
			
		||||
 | 
			
		||||
@ -29,7 +29,7 @@ import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.UpdateMessage;
 | 
			
		||||
import org.thingsboard.server.common.msg.notification.trigger.NewPlatformVersionTrigger;
 | 
			
		||||
import org.thingsboard.server.queue.notification.NotificationRuleProcessor;
 | 
			
		||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
 | 
			
		||||
import org.thingsboard.server.queue.util.AfterStartUp;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1263,9 +1263,9 @@ notification_system:
 | 
			
		||||
  thread_pool_size: "${TB_NOTIFICATION_SYSTEM_THREAD_POOL_SIZE:10}"
 | 
			
		||||
  rules:
 | 
			
		||||
    trigger_types_configs:
 | 
			
		||||
      RATE_LIMITS:
 | 
			
		||||
        # In milliseconds, 4 hours by default
 | 
			
		||||
        deduplication_duration: "${RATE_LIMITS_NOTIFICATION_RULE_DEDUPLICATION_DURATION:14400000}"
 | 
			
		||||
      NEW_PLATFORM_VERSION:
 | 
			
		||||
        # In milliseconds, infinitely by default
 | 
			
		||||
        deduplication_duration: "${NEW_PLATFORM_VERSION_NOTIFICATION_RULE_DEDUPLICATION_DURATION:0}"
 | 
			
		||||
 | 
			
		||||
management:
 | 
			
		||||
  endpoints:
 | 
			
		||||
 | 
			
		||||
@ -62,6 +62,9 @@ public class NotificationRequestStats {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        String errorMessage = error.getMessage();
 | 
			
		||||
        if (errorMessage == null) {
 | 
			
		||||
            errorMessage = error.getClass().getSimpleName();
 | 
			
		||||
        }
 | 
			
		||||
        errors.computeIfAbsent(deliveryMethod, k -> new ConcurrentHashMap<>()).put(recipient.getTitle(), errorMessage);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -16,7 +16,6 @@
 | 
			
		||||
package org.thingsboard.server.common.data.util;
 | 
			
		||||
 | 
			
		||||
import java.util.Collection;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
@ -51,20 +50,19 @@ public class CollectionsUtil {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @SuppressWarnings("unchecked")
 | 
			
		||||
    public static <K, V> Map<K, V> mapOf(Object... kvs) {
 | 
			
		||||
        Map<K, V> map = new HashMap<>();
 | 
			
		||||
    public static <T> Map<T, T> mapOf(T... kvs) {
 | 
			
		||||
        if (kvs.length % 2 != 0) {
 | 
			
		||||
            throw new IllegalArgumentException("Invalid number of parameters");
 | 
			
		||||
        }
 | 
			
		||||
        Map<T, T> map = new HashMap<>();
 | 
			
		||||
        for (int i = 0; i < kvs.length; i += 2) {
 | 
			
		||||
            K key = (K) kvs[i];
 | 
			
		||||
            V value = (V) kvs[i + 1];
 | 
			
		||||
            T key = kvs[i];
 | 
			
		||||
            T value = kvs[i + 1];
 | 
			
		||||
            map.put(key, value);
 | 
			
		||||
        }
 | 
			
		||||
        return map;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static <K, V> Map<K, V> unmodifiableMapOf(Object... kvs) {
 | 
			
		||||
        return Collections.unmodifiableMap(mapOf(kvs));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static <V> boolean emptyOrContains(Collection<V> collection, V element) {
 | 
			
		||||
        return isEmpty(collection) || collection.contains(element);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -203,10 +203,3 @@ service:
 | 
			
		||||
  type: "${TB_SERVICE_TYPE:tb-vc-executor}"
 | 
			
		||||
  # Unique id for this service (autogenerated if empty)
 | 
			
		||||
  id: "${TB_SERVICE_ID:}"
 | 
			
		||||
 | 
			
		||||
notification_system:
 | 
			
		||||
  rules:
 | 
			
		||||
    trigger_types_configs:
 | 
			
		||||
      RATE_LIMITS:
 | 
			
		||||
        # In milliseconds, 4 hours by default
 | 
			
		||||
        deduplication_duration: "${RATE_LIMITS_NOTIFICATION_RULE_DEDUPLICATION_DURATION:14400000}"
 | 
			
		||||
 | 
			
		||||
@ -302,10 +302,3 @@ management:
 | 
			
		||||
      exposure:
 | 
			
		||||
        # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
 | 
			
		||||
        include: '${METRICS_ENDPOINTS_EXPOSE:info}'
 | 
			
		||||
 | 
			
		||||
notification_system:
 | 
			
		||||
  rules:
 | 
			
		||||
    trigger_types_configs:
 | 
			
		||||
      RATE_LIMITS:
 | 
			
		||||
        # In milliseconds, 4 hours by default
 | 
			
		||||
        deduplication_duration: "${RATE_LIMITS_NOTIFICATION_RULE_DEDUPLICATION_DURATION:14400000}"
 | 
			
		||||
 | 
			
		||||
@ -287,10 +287,3 @@ management:
 | 
			
		||||
      exposure:
 | 
			
		||||
        # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
 | 
			
		||||
        include: '${METRICS_ENDPOINTS_EXPOSE:info}'
 | 
			
		||||
 | 
			
		||||
notification_system:
 | 
			
		||||
  rules:
 | 
			
		||||
    trigger_types_configs:
 | 
			
		||||
      RATE_LIMITS:
 | 
			
		||||
        # In milliseconds, 4 hours by default
 | 
			
		||||
        deduplication_duration: "${RATE_LIMITS_NOTIFICATION_RULE_DEDUPLICATION_DURATION:14400000}"
 | 
			
		||||
 | 
			
		||||
@ -369,10 +369,3 @@ management:
 | 
			
		||||
      exposure:
 | 
			
		||||
        # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
 | 
			
		||||
        include: '${METRICS_ENDPOINTS_EXPOSE:info}'
 | 
			
		||||
 | 
			
		||||
notification_system:
 | 
			
		||||
  rules:
 | 
			
		||||
    trigger_types_configs:
 | 
			
		||||
      RATE_LIMITS:
 | 
			
		||||
        # In milliseconds, 4 hours by default
 | 
			
		||||
        deduplication_duration: "${RATE_LIMITS_NOTIFICATION_RULE_DEDUPLICATION_DURATION:14400000}"
 | 
			
		||||
 | 
			
		||||
@ -317,10 +317,3 @@ management:
 | 
			
		||||
      exposure:
 | 
			
		||||
        # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
 | 
			
		||||
        include: '${METRICS_ENDPOINTS_EXPOSE:info}'
 | 
			
		||||
 | 
			
		||||
notification_system:
 | 
			
		||||
  rules:
 | 
			
		||||
    trigger_types_configs:
 | 
			
		||||
      RATE_LIMITS:
 | 
			
		||||
        # In milliseconds, 4 hours by default
 | 
			
		||||
        deduplication_duration: "${RATE_LIMITS_NOTIFICATION_RULE_DEDUPLICATION_DURATION:14400000}"
 | 
			
		||||
 | 
			
		||||
@ -267,10 +267,3 @@ management:
 | 
			
		||||
      exposure:
 | 
			
		||||
        # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
 | 
			
		||||
        include: '${METRICS_ENDPOINTS_EXPOSE:info}'
 | 
			
		||||
 | 
			
		||||
notification_system:
 | 
			
		||||
  rules:
 | 
			
		||||
    trigger_types_configs:
 | 
			
		||||
      RATE_LIMITS:
 | 
			
		||||
        # In milliseconds, 4 hours by default
 | 
			
		||||
        deduplication_duration: "${RATE_LIMITS_NOTIFICATION_RULE_DEDUPLICATION_DURATION:14400000}"
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user