diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index 9e17641c69..7e2719598c 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -61,7 +61,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceM import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; -import org.thingsboard.server.queue.notification.NotificationRuleProcessor; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.service.apiusage.BaseApiUsageState.StatsCalculationResult; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.mail.MailExecutorService; diff --git a/application/src/main/java/org/thingsboard/server/service/notification/DefaultNotificationCenter.java b/application/src/main/java/org/thingsboard/server/service/notification/DefaultNotificationCenter.java index 0cb9dc780b..7215776170 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/DefaultNotificationCenter.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/DefaultNotificationCenter.java @@ -15,13 +15,10 @@ */ package org.thingsboard.server.service.notification; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.User; @@ -59,14 +56,12 @@ import org.thingsboard.server.dao.notification.NotificationService; import org.thingsboard.server.dao.notification.NotificationSettingsService; import org.thingsboard.server.dao.notification.NotificationTargetService; import org.thingsboard.server.dao.notification.NotificationTemplateService; -import org.thingsboard.server.dao.user.UserService; +import org.thingsboard.server.dao.util.limits.LimitedApi; +import org.thingsboard.server.dao.util.limits.RateLimitService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.NotificationsTopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; -import org.thingsboard.server.dao.util.limits.LimitedApi; -import org.thingsboard.server.dao.util.limits.RateLimitService; -import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.executors.NotificationExecutorService; import org.thingsboard.server.service.notification.channels.NotificationChannel; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; @@ -74,7 +69,6 @@ import org.thingsboard.server.service.telemetry.AbstractSubscriptionService; import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate; -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -87,7 +81,7 @@ import java.util.stream.Collectors; @Service @Slf4j @RequiredArgsConstructor -@SuppressWarnings({"UnstableApiUsage", "rawtypes"}) +@SuppressWarnings({"rawtypes"}) public class DefaultNotificationCenter extends AbstractSubscriptionService implements NotificationCenter, NotificationChannel { private final NotificationTargetService notificationTargetService; @@ -95,9 +89,7 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple private final NotificationService notificationService; private final NotificationTemplateService notificationTemplateService; private final NotificationSettingsService notificationSettingsService; - private final UserService userService; private final NotificationExecutorService notificationExecutor; - private final DbCallbackExecutorService dbCallbackExecutorService; private final NotificationsTopicService notificationsTopicService; private final TbQueueProducerProvider producerProvider; private final RateLimitService rateLimitService; @@ -172,37 +164,32 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple .build(); notificationExecutor.submit(() -> { - List> results = new ArrayList<>(); - for (NotificationTarget target : targets) { - List> result = processForTarget(target, ctx); - results.addAll(result); + processForTarget(target, ctx); } - Futures.whenAllComplete(results).run(() -> { - NotificationRequestId requestId = ctx.getRequest().getId(); - log.debug("[{}] Notification request processing is finished", requestId); - NotificationRequestStats stats = ctx.getStats(); - try { - notificationRequestService.updateNotificationRequest(tenantId, requestId, NotificationRequestStatus.SENT, stats); - } catch (Exception e) { - log.error("[{}] Failed to update stats for notification request", requestId, e); - } + NotificationRequestId requestId = ctx.getRequest().getId(); + log.debug("[{}] Notification request processing is finished", requestId); + NotificationRequestStats stats = ctx.getStats(); + try { + notificationRequestService.updateNotificationRequest(tenantId, requestId, NotificationRequestStatus.SENT, stats); + } catch (Exception e) { + log.error("[{}] Failed to update stats for notification request", requestId, e); + } - if (callback != null) { - try { - callback.accept(stats); - } catch (Exception e) { - log.error("Failed to process callback for notification request {}", requestId, e); - } + if (callback != null) { + try { + callback.accept(stats); + } catch (Exception e) { + log.error("Failed to process callback for notification request {}", requestId, e); } - }, dbCallbackExecutorService); + } }); return request; } - private List> processForTarget(NotificationTarget target, NotificationProcessingContext ctx) { + private void processForTarget(NotificationTarget target, NotificationProcessingContext ctx) { Iterable recipients; switch (target.getConfiguration().getType()) { case PLATFORM_USERS: { @@ -231,43 +218,35 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple Set deliveryMethods = new HashSet<>(ctx.getDeliveryMethods()); deliveryMethods.removeIf(deliveryMethod -> !target.getConfiguration().getType().getSupportedDeliveryMethods().contains(deliveryMethod)); log.debug("[{}] Processing notification request for {} target ({}) for delivery methods {}", ctx.getRequest().getId(), target.getConfiguration().getType(), target.getId(), deliveryMethods); + if (deliveryMethods.isEmpty()) { + return; + } - List> results = new ArrayList<>(); - if (!deliveryMethods.isEmpty()) { - for (NotificationRecipient recipient : recipients) { - for (NotificationDeliveryMethod deliveryMethod : deliveryMethods) { - ListenableFuture resultFuture = processForRecipient(deliveryMethod, recipient, ctx); - DonAsynchron.withCallback(resultFuture, result -> { - ctx.getStats().reportSent(deliveryMethod, recipient); - }, error -> { - ctx.getStats().reportError(deliveryMethod, error, recipient); - }); - results.add(resultFuture); + for (NotificationRecipient recipient : recipients) { + for (NotificationDeliveryMethod deliveryMethod : deliveryMethods) { + try { + processForRecipient(deliveryMethod, recipient, ctx); + ctx.getStats().reportSent(deliveryMethod, recipient); + } catch (Exception error) { + ctx.getStats().reportError(deliveryMethod, error, recipient); } } } - return results; } - private ListenableFuture processForRecipient(NotificationDeliveryMethod deliveryMethod, NotificationRecipient recipient, NotificationProcessingContext ctx) { + private void processForRecipient(NotificationDeliveryMethod deliveryMethod, NotificationRecipient recipient, NotificationProcessingContext ctx) throws Exception { if (ctx.getStats().contains(deliveryMethod, recipient.getId())) { - return Futures.immediateFailedFuture(new AlreadySentException()); + throw new AlreadySentException(); } - - DeliveryMethodNotificationTemplate processedTemplate; - try { - processedTemplate = ctx.getProcessedTemplate(deliveryMethod, recipient); - } catch (Exception e) { - return Futures.immediateFailedFuture(e); - } - NotificationChannel notificationChannel = channels.get(deliveryMethod); + DeliveryMethodNotificationTemplate processedTemplate = ctx.getProcessedTemplate(deliveryMethod, recipient); + log.trace("[{}] Sending {} notification for recipient {}", ctx.getRequest().getId(), deliveryMethod, recipient); - return notificationChannel.sendNotification(recipient, processedTemplate, ctx); + notificationChannel.sendNotification(recipient, processedTemplate, ctx); } @Override - public ListenableFuture sendNotification(User recipient, WebDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) { + public void sendNotification(User recipient, WebDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) throws Exception { NotificationRequest request = ctx.getRequest(); Notification notification = Notification.builder() .requestId(request.getId()) @@ -283,14 +262,14 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple notification = notificationService.saveNotification(recipient.getTenantId(), notification); } catch (Exception e) { log.error("Failed to create notification for recipient {}", recipient.getId(), e); - return Futures.immediateFailedFuture(e); + throw e; } NotificationUpdate update = NotificationUpdate.builder() .created(true) .notification(notification) .build(); - return onNotificationUpdate(recipient.getTenantId(), recipient.getId(), update); + onNotificationUpdate(recipient.getTenantId(), recipient.getId(), update); } @Override @@ -384,13 +363,11 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple clusterService.pushMsgToCore(tenantId, notificationRequestId, toCoreMsg, null); } - private ListenableFuture onNotificationUpdate(TenantId tenantId, UserId recipientId, NotificationUpdate update) { + private void onNotificationUpdate(TenantId tenantId, UserId recipientId, NotificationUpdate update) { log.trace("Submitting notification update for recipient {}: {}", recipientId, update); - return Futures.submit(() -> { - forwardToSubscriptionManagerService(tenantId, recipientId, subscriptionManagerService -> { - subscriptionManagerService.onNotificationUpdate(tenantId, recipientId, update, TbCallback.EMPTY); - }, () -> TbSubscriptionUtils.notificationUpdateToProto(tenantId, recipientId, update)); - }, wsCallBackExecutor); + forwardToSubscriptionManagerService(tenantId, recipientId, subscriptionManagerService -> { + subscriptionManagerService.onNotificationUpdate(tenantId, recipientId, update, TbCallback.EMPTY); + }, () -> TbSubscriptionUtils.notificationUpdateToProto(tenantId, recipientId, update)); } private void onNotificationRequestUpdate(TenantId tenantId, NotificationRequestUpdate update) { diff --git a/application/src/main/java/org/thingsboard/server/service/notification/channels/EmailNotificationChannel.java b/application/src/main/java/org/thingsboard/server/service/notification/channels/EmailNotificationChannel.java index 8b3c9551c2..8e9e8525e3 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/channels/EmailNotificationChannel.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/channels/EmailNotificationChannel.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.notification.channels; -import com.google.common.util.concurrent.ListenableFuture; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import org.thingsboard.rule.engine.api.MailService; @@ -24,7 +23,6 @@ import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.notification.NotificationDeliveryMethod; import org.thingsboard.server.common.data.notification.template.EmailDeliveryMethodNotificationTemplate; -import org.thingsboard.server.service.mail.MailExecutorService; import org.thingsboard.server.service.notification.NotificationProcessingContext; @Component @@ -32,19 +30,15 @@ import org.thingsboard.server.service.notification.NotificationProcessingContext public class EmailNotificationChannel implements NotificationChannel { private final MailService mailService; - private final MailExecutorService executor; @Override - public ListenableFuture sendNotification(User recipient, EmailDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) { - return executor.submit(() -> { - mailService.send(recipient.getTenantId(), null, TbEmail.builder() - .to(recipient.getEmail()) - .subject(processedTemplate.getSubject()) - .body(processedTemplate.getBody()) - .html(true) - .build()); - return null; - }); + public void sendNotification(User recipient, EmailDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) throws Exception { + mailService.send(recipient.getTenantId(), null, TbEmail.builder() + .to(recipient.getEmail()) + .subject(processedTemplate.getSubject()) + .body(processedTemplate.getBody()) + .html(true) + .build()); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/notification/channels/NotificationChannel.java b/application/src/main/java/org/thingsboard/server/service/notification/channels/NotificationChannel.java index 02fe6264d2..0275644765 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/channels/NotificationChannel.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/channels/NotificationChannel.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.notification.channels; -import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.notification.NotificationDeliveryMethod; import org.thingsboard.server.common.data.notification.targets.NotificationRecipient; @@ -24,7 +23,7 @@ import org.thingsboard.server.service.notification.NotificationProcessingContext public interface NotificationChannel { - ListenableFuture sendNotification(R recipient, T processedTemplate, NotificationProcessingContext ctx); + void sendNotification(R recipient, T processedTemplate, NotificationProcessingContext ctx) throws Exception; void check(TenantId tenantId) throws Exception; diff --git a/application/src/main/java/org/thingsboard/server/service/notification/channels/SlackNotificationChannel.java b/application/src/main/java/org/thingsboard/server/service/notification/channels/SlackNotificationChannel.java index 46afbd7270..25c6b9dcab 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/channels/SlackNotificationChannel.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/channels/SlackNotificationChannel.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.notification.channels; -import com.google.common.util.concurrent.ListenableFuture; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; import org.thingsboard.rule.engine.api.slack.SlackService; @@ -26,7 +25,6 @@ import org.thingsboard.server.common.data.notification.settings.SlackNotificatio import org.thingsboard.server.common.data.notification.targets.slack.SlackConversation; import org.thingsboard.server.common.data.notification.template.SlackDeliveryMethodNotificationTemplate; import org.thingsboard.server.dao.notification.NotificationSettingsService; -import org.thingsboard.server.service.executors.ExternalCallExecutorService; import org.thingsboard.server.service.notification.NotificationProcessingContext; @Component @@ -35,15 +33,11 @@ public class SlackNotificationChannel implements NotificationChannel sendNotification(SlackConversation conversation, SlackDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) { + public void sendNotification(SlackConversation conversation, SlackDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) throws Exception { SlackNotificationDeliveryMethodConfig config = ctx.getDeliveryMethodConfig(NotificationDeliveryMethod.SLACK); - return executor.submit(() -> { - slackService.sendMessage(ctx.getTenantId(), config.getBotToken(), conversation.getId(), processedTemplate.getBody()); - return null; - }); + slackService.sendMessage(ctx.getTenantId(), config.getBotToken(), conversation.getId(), processedTemplate.getBody()); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/notification/channels/SmsNotificationChannel.java b/application/src/main/java/org/thingsboard/server/service/notification/channels/SmsNotificationChannel.java index 3c44dabd4b..44b61d3bb3 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/channels/SmsNotificationChannel.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/channels/SmsNotificationChannel.java @@ -15,8 +15,6 @@ */ package org.thingsboard.server.service.notification.channels; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; @@ -26,26 +24,21 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.notification.NotificationDeliveryMethod; import org.thingsboard.server.common.data.notification.template.SmsDeliveryMethodNotificationTemplate; import org.thingsboard.server.service.notification.NotificationProcessingContext; -import org.thingsboard.server.service.sms.SmsExecutorService; @Component @RequiredArgsConstructor public class SmsNotificationChannel implements NotificationChannel { private final SmsService smsService; - private final SmsExecutorService executor; @Override - public ListenableFuture sendNotification(User recipient, SmsDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) { + public void sendNotification(User recipient, SmsDeliveryMethodNotificationTemplate processedTemplate, NotificationProcessingContext ctx) throws Exception { String phone = recipient.getPhone(); if (StringUtils.isBlank(phone)) { - return Futures.immediateFailedFuture(new RuntimeException("User does not have phone number")); + throw new RuntimeException("User does not have phone number"); } - return executor.submit(() -> { - smsService.sendSms(recipient.getTenantId(), recipient.getCustomerId(), new String[]{phone}, processedTemplate.getBody()); - return null; - }); + smsService.sendSms(recipient.getTenantId(), recipient.getCustomerId(), new String[]{phone}, processedTemplate.getBody()); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java b/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java index 297ccc4007..71a59026a2 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java @@ -15,10 +15,11 @@ */ package org.thingsboard.server.service.notification.rule; -import lombok.Data; import lombok.RequiredArgsConstructor; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; import org.springframework.context.annotation.Lazy; @@ -38,29 +39,27 @@ import org.thingsboard.server.common.data.notification.info.NotificationInfo; import org.thingsboard.server.common.data.notification.rule.NotificationRule; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType; +import org.thingsboard.server.common.data.notification.settings.TriggerTypeConfig; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger; -import org.thingsboard.server.common.msg.notification.trigger.RuleEngineMsgTrigger; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.dao.notification.NotificationRequestService; import org.thingsboard.server.dao.util.limits.LimitedApi; import org.thingsboard.server.dao.util.limits.RateLimitService; import org.thingsboard.server.queue.discovery.PartitionService; -import org.thingsboard.server.queue.notification.NotificationRuleProcessor; import org.thingsboard.server.service.executors.NotificationExecutorService; import org.thingsboard.server.service.notification.rule.cache.NotificationRulesCache; import org.thingsboard.server.service.notification.rule.trigger.NotificationRuleTriggerProcessor; -import org.thingsboard.server.service.notification.rule.trigger.RuleEngineMsgNotificationRuleTriggerProcessor; import javax.annotation.PostConstruct; -import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; import java.util.EnumMap; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; @@ -96,20 +95,27 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess @Override public void process(NotificationRuleTrigger trigger) { NotificationRuleTriggerType triggerType = trigger.getType(); - if (triggerType == null) return; TenantId tenantId = triggerType.isTenantLevel() ? trigger.getTenantId() : TenantId.SYS_TENANT_ID; try { - List rules = notificationRulesCache.getEnabled(tenantId, triggerType); - for (NotificationRule rule : rules) { - notificationExecutor.submit(() -> { + List enabledRules = notificationRulesCache.getEnabled(tenantId, triggerType); + if (enabledRules.isEmpty()) { + return; + } + if (trigger.deduplicate()) { + enabledRules = new ArrayList<>(enabledRules); + enabledRules.removeIf(rule -> alreadySent(rule, trigger)); + } + final List rules = enabledRules; + notificationExecutor.submit(() -> { + for (NotificationRule rule : rules) { try { processNotificationRule(rule, trigger); } catch (Throwable e) { log.error("Failed to process notification rule {} for trigger type {} with trigger object {}", rule.getId(), rule.getTriggerType(), trigger, e); } - }); - } + } + }); } catch (Throwable e) { log.error("Failed to process notification rules for trigger: {}", trigger, e); } @@ -172,14 +178,13 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess .ruleId(rule.getId()) .originatorEntityId(originatorEntityId) .build(); - notificationExecutor.submit(() -> { - try { - log.debug("Submitting notification request for rule '{}' with delay of {} sec to targets {}", rule.getName(), delayInSec, targets); - notificationCenter.processNotificationRequest(rule.getTenantId(), notificationRequest, null); - } catch (Exception e) { - log.error("Failed to process notification request for tenant {} for rule {}", rule.getTenantId(), rule.getId(), e); - } - }); + + try { + log.debug("Submitting notification request for rule '{}' with delay of {} sec to targets {}", rule.getName(), delayInSec, targets); + notificationCenter.processNotificationRequest(rule.getTenantId(), notificationRequest, null); + } catch (Exception e) { + log.error("Failed to process notification request for tenant {} for rule {}", rule.getTenantId(), rule.getId(), e); + } } private boolean matchesFilter(NotificationRuleTrigger trigger, NotificationRuleTriggerConfig triggerConfig) { @@ -243,24 +248,9 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess @Autowired public void setTriggerProcessors(Collection processors) { - Map ruleEngineMsgTypeToTriggerType = new HashMap<>(); processors.forEach(processor -> { triggerProcessors.put(processor.getTriggerType(), processor); - if (processor instanceof RuleEngineMsgNotificationRuleTriggerProcessor) { - Set supportedMsgTypes = ((RuleEngineMsgNotificationRuleTriggerProcessor) processor).getSupportedMsgTypes(); - supportedMsgTypes.forEach(supportedMsgType -> { - ruleEngineMsgTypeToTriggerType.put(supportedMsgType, processor.getTriggerType()); - }); - } }); - RuleEngineMsgTrigger.msgTypeToTriggerType = ruleEngineMsgTypeToTriggerType; - } - - @Data - private static class SentNotification implements Serializable { - private static final long serialVersionUID = 38973480405095422L; - - private final NotificationRuleTrigger trigger; } } diff --git a/application/src/main/java/org/thingsboard/server/service/notification/rule/cache/DefaultNotificationRulesCache.java b/application/src/main/java/org/thingsboard/server/service/notification/rule/cache/DefaultNotificationRulesCache.java index a02b7bf1e4..a43ed59efa 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/rule/cache/DefaultNotificationRulesCache.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/rule/cache/DefaultNotificationRulesCache.java @@ -17,7 +17,6 @@ package org.thingsboard.server.service.notification.rule.cache; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; -import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -50,7 +49,7 @@ public class DefaultNotificationRulesCache implements NotificationRulesCache { private int cacheMaxSize; @Value("${cache.notificationRules.timeToLiveInMinutes:30}") private int cacheValueTtl; - private Cache> cache; + private Cache> cache; private final ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -96,21 +95,15 @@ public class DefaultNotificationRulesCache implements NotificationRulesCache { } } - private void evict(TenantId tenantId) { + public void evict(TenantId tenantId) { cache.invalidateAll(Arrays.stream(NotificationRuleTriggerType.values()) .map(triggerType -> key(tenantId, triggerType)) .collect(Collectors.toList())); log.trace("Evicted all notification rules for tenant {} from cache", tenantId); } - private static CacheKey key(TenantId tenantId, NotificationRuleTriggerType triggerType) { - return new CacheKey(tenantId, triggerType); - } - - @Data - private static class CacheKey { - private final TenantId tenantId; - private final NotificationRuleTriggerType triggerType; + private static String key(TenantId tenantId, NotificationRuleTriggerType triggerType) { + return tenantId + "_" + triggerType; } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 2bd0043046..182fce1c48 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -63,7 +63,7 @@ import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; -import org.thingsboard.server.queue.notification.NotificationRuleProcessor; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.DataDecodingEncodingService; diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java index fec9d10a18..e7f2a5a66f 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java @@ -52,7 +52,7 @@ import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.stats.TbApiUsageReportClient; import org.thingsboard.server.dao.alarm.AlarmOperationResult; import org.thingsboard.server.dao.alarm.AlarmService; -import org.thingsboard.server.queue.notification.NotificationRuleProcessor; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.entitiy.alarm.TbAlarmCommentService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; diff --git a/application/src/main/java/org/thingsboard/server/service/update/DefaultUpdateService.java b/application/src/main/java/org/thingsboard/server/service/update/DefaultUpdateService.java index ef066a73f6..587d86af32 100644 --- a/application/src/main/java/org/thingsboard/server/service/update/DefaultUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/update/DefaultUpdateService.java @@ -29,7 +29,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.UpdateMessage; import org.thingsboard.server.common.msg.notification.trigger.NewPlatformVersionTrigger; -import org.thingsboard.server.queue.notification.NotificationRuleProcessor; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbCoreComponent; diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index f780d42a0d..392587c869 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1263,9 +1263,9 @@ notification_system: thread_pool_size: "${TB_NOTIFICATION_SYSTEM_THREAD_POOL_SIZE:10}" rules: trigger_types_configs: - RATE_LIMITS: - # In milliseconds, 4 hours by default - deduplication_duration: "${RATE_LIMITS_NOTIFICATION_RULE_DEDUPLICATION_DURATION:14400000}" + NEW_PLATFORM_VERSION: + # In milliseconds, infinitely by default + deduplication_duration: "${NEW_PLATFORM_VERSION_NOTIFICATION_RULE_DEDUPLICATION_DURATION:0}" management: endpoints: diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/notification/NotificationRequestStats.java b/common/data/src/main/java/org/thingsboard/server/common/data/notification/NotificationRequestStats.java index 31e05a2cb3..619e1ad38f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/notification/NotificationRequestStats.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/notification/NotificationRequestStats.java @@ -62,6 +62,9 @@ public class NotificationRequestStats { return; } String errorMessage = error.getMessage(); + if (errorMessage == null) { + errorMessage = error.getClass().getSimpleName(); + } errors.computeIfAbsent(deliveryMethod, k -> new ConcurrentHashMap<>()).put(recipient.getTitle(), errorMessage); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java b/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java index d8d17613de..a0736ca50e 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/util/CollectionsUtil.java @@ -16,7 +16,6 @@ package org.thingsboard.server.common.data.util; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -51,20 +50,19 @@ public class CollectionsUtil { } @SuppressWarnings("unchecked") - public static Map mapOf(Object... kvs) { - Map map = new HashMap<>(); + public static Map mapOf(T... kvs) { + if (kvs.length % 2 != 0) { + throw new IllegalArgumentException("Invalid number of parameters"); + } + Map map = new HashMap<>(); for (int i = 0; i < kvs.length; i += 2) { - K key = (K) kvs[i]; - V value = (V) kvs[i + 1]; + T key = kvs[i]; + T value = kvs[i + 1]; map.put(key, value); } return map; } - public static Map unmodifiableMapOf(Object... kvs) { - return Collections.unmodifiableMap(mapOf(kvs)); - } - public static boolean emptyOrContains(Collection collection, V element) { return isEmpty(collection) || collection.contains(element); } diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 75f9e09d3c..9d5ad35388 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -203,10 +203,3 @@ service: type: "${TB_SERVICE_TYPE:tb-vc-executor}" # Unique id for this service (autogenerated if empty) id: "${TB_SERVICE_ID:}" - -notification_system: - rules: - trigger_types_configs: - RATE_LIMITS: - # In milliseconds, 4 hours by default - deduplication_duration: "${RATE_LIMITS_NOTIFICATION_RULE_DEDUPLICATION_DURATION:14400000}" diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 8fe079859a..7ea553fe5c 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -302,10 +302,3 @@ management: exposure: # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics). include: '${METRICS_ENDPOINTS_EXPOSE:info}' - -notification_system: - rules: - trigger_types_configs: - RATE_LIMITS: - # In milliseconds, 4 hours by default - deduplication_duration: "${RATE_LIMITS_NOTIFICATION_RULE_DEDUPLICATION_DURATION:14400000}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index f05db08643..346ec48eae 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -287,10 +287,3 @@ management: exposure: # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics). include: '${METRICS_ENDPOINTS_EXPOSE:info}' - -notification_system: - rules: - trigger_types_configs: - RATE_LIMITS: - # In milliseconds, 4 hours by default - deduplication_duration: "${RATE_LIMITS_NOTIFICATION_RULE_DEDUPLICATION_DURATION:14400000}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 745a7d126a..4e8167d89d 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -369,10 +369,3 @@ management: exposure: # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics). include: '${METRICS_ENDPOINTS_EXPOSE:info}' - -notification_system: - rules: - trigger_types_configs: - RATE_LIMITS: - # In milliseconds, 4 hours by default - deduplication_duration: "${RATE_LIMITS_NOTIFICATION_RULE_DEDUPLICATION_DURATION:14400000}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 3795c533a7..1e0b1ebcd4 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -317,10 +317,3 @@ management: exposure: # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics). include: '${METRICS_ENDPOINTS_EXPOSE:info}' - -notification_system: - rules: - trigger_types_configs: - RATE_LIMITS: - # In milliseconds, 4 hours by default - deduplication_duration: "${RATE_LIMITS_NOTIFICATION_RULE_DEDUPLICATION_DURATION:14400000}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 11dcc96010..9f086bcbc5 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -267,10 +267,3 @@ management: exposure: # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics). include: '${METRICS_ENDPOINTS_EXPOSE:info}' - -notification_system: - rules: - trigger_types_configs: - RATE_LIMITS: - # In milliseconds, 4 hours by default - deduplication_duration: "${RATE_LIMITS_NOTIFICATION_RULE_DEDUPLICATION_DURATION:14400000}"