diff --git a/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java b/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java index 57dba39063..8bfdc0e243 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java @@ -15,13 +15,17 @@ */ package org.thingsboard.server.service.notification.rule; +import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; import org.springframework.context.annotation.Lazy; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.NotificationCenter; +import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.NotificationRequestId; @@ -35,13 +39,13 @@ import org.thingsboard.server.common.data.notification.rule.NotificationRule; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; -import org.thingsboard.server.queue.notification.NotificationRuleProcessor; import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger; import org.thingsboard.server.common.msg.notification.trigger.RuleEngineMsgTrigger; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.dao.notification.NotificationRequestService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.notification.NotificationRuleProcessor; import org.thingsboard.server.service.apiusage.limits.LimitedApi; import org.thingsboard.server.service.apiusage.limits.RateLimitService; import org.thingsboard.server.service.executors.NotificationExecutorService; @@ -49,6 +53,8 @@ import org.thingsboard.server.service.notification.rule.cache.NotificationRulesC import org.thingsboard.server.service.notification.rule.trigger.NotificationRuleTriggerProcessor; import org.thingsboard.server.service.notification.rule.trigger.RuleEngineMsgNotificationRuleTriggerProcessor; +import javax.annotation.PostConstruct; +import java.io.Serializable; import java.util.Collection; import java.util.EnumMap; import java.util.HashMap; @@ -71,9 +77,19 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess @Autowired @Lazy private NotificationCenter notificationCenter; private final NotificationExecutorService notificationExecutor; + private final CacheManager cacheManager; + private Cache sentNotifications; private final Map triggerProcessors = new EnumMap<>(NotificationRuleTriggerType.class); + @PostConstruct + private void init() { + sentNotifications = cacheManager.getCache(CacheConstants.SENT_NOTIFICATIONS_CACHE); + if (sentNotifications == null) { + throw new IllegalStateException("Sent notifications cache is not set up"); + } + } + @Override public void process(NotificationRuleTrigger trigger) { NotificationRuleTriggerType triggerType = trigger.getType(); @@ -126,6 +142,9 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess log.debug("[{}] Rate limit for notification requests per rule was exceeded (rule '{}')", rule.getTenantId(), rule.getName()); return; } + if (trigger.getType().isDeduplicate() && alreadySent(rule.getId(), trigger)) { + return; + } NotificationInfo notificationInfo = constructNotificationInfo(trigger, triggerConfig); rule.getRecipientsConfig().getTargetsTable().forEach((delay, targets) -> { @@ -175,6 +194,23 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess return triggerProcessors.get(triggerConfig.getTriggerType()).constructNotificationInfo(trigger); } + private boolean alreadySent(NotificationRuleId ruleId, NotificationRuleTrigger trigger) { + String key = ruleId + "_" + trigger.getOriginatorEntityId(); + SentNotification sent = sentNotifications.get(key, SentNotification.class); + boolean alreadySent; + if (sent != null && sent.getTrigger().equals(trigger)) { + alreadySent = true; + log.debug("Notification for {} trigger was already sent, ignoring", trigger.getType()); + // updating cache anyway so that the value is not removed by ttl + } else { + alreadySent = false; + sent = new SentNotification(trigger); + } + log.trace("[{}] Putting to sentNotifications cache: {}", ruleId, trigger); + sentNotifications.put(key, sent); + return alreadySent; + } + @EventListener(ComponentLifecycleMsg.class) public void onNotificationRuleDeleted(ComponentLifecycleMsg componentLifecycleMsg) { if (componentLifecycleMsg.getEvent() != ComponentLifecycleEvent.DELETED || @@ -209,4 +245,11 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess RuleEngineMsgTrigger.msgTypeToTriggerType = ruleEngineMsgTypeToTriggerType; } + @Data + private static class SentNotification implements Serializable { + private static final long serialVersionUID = 38973480405095422L; + + private final NotificationRuleTrigger trigger; + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/NewPlatformVersionTriggerProcessor.java b/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/NewPlatformVersionTriggerProcessor.java index cb44e7b3ef..639378ad72 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/NewPlatformVersionTriggerProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/NewPlatformVersionTriggerProcessor.java @@ -18,26 +18,18 @@ package org.thingsboard.server.service.notification.rule.trigger; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.UpdateMessage; -import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.notification.info.NewPlatformVersionNotificationInfo; import org.thingsboard.server.common.data.notification.info.RuleOriginatedNotificationInfo; import org.thingsboard.server.common.data.notification.rule.trigger.NewPlatformVersionNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType; import org.thingsboard.server.common.msg.notification.trigger.NewPlatformVersionTrigger; -import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.queue.discovery.PartitionService; @Service @RequiredArgsConstructor public class NewPlatformVersionTriggerProcessor implements NotificationRuleTriggerProcessor { - private final PartitionService partitionService; - @Override public boolean matchesFilter(NewPlatformVersionTrigger trigger, NewPlatformVersionNotificationRuleTriggerConfig triggerConfig) { - if (!partitionService.isMyPartition(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID)) { - return false; - } return trigger.getUpdateInfo().isUpdateAvailable(); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 7d027c3836..e907b7b32d 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -445,6 +445,9 @@ cache: notificationSettings: timeToLiveInMinutes: "${CACHE_SPECS_NOTIFICATION_SETTINGS_TTL:10}" maxSize: "${CACHE_SPECS_NOTIFICATION_SETTINGS_MAX_SIZE:1000}" + sentNotifications: + timeToLiveInMinutes: "${CACHE_SPECS_SENT_NOTIFICATIONS_TTL:1440}" + maxSize: "${CACHE_SPECS_SENT_NOTIFICATIONS_MAX_SIZE:10000}" attributes: timeToLiveInMinutes: "${CACHE_SPECS_ATTRIBUTES_TTL:1440}" maxSize: "${CACHE_SPECS_ATTRIBUTES_MAX_SIZE:100000}" diff --git a/application/src/test/java/org/thingsboard/server/service/notification/AbstractNotificationApiTest.java b/application/src/test/java/org/thingsboard/server/service/notification/AbstractNotificationApiTest.java index b5238d675b..a22143ab1c 100644 --- a/application/src/test/java/org/thingsboard/server/service/notification/AbstractNotificationApiTest.java +++ b/application/src/test/java/org/thingsboard/server/service/notification/AbstractNotificationApiTest.java @@ -204,7 +204,7 @@ public abstract class AbstractNotificationApiTest extends AbstractControllerTest NotificationTemplate template = createNotificationTemplate(NotificationType.valueOf(triggerConfig.getTriggerType().toString()), subject, text, NotificationDeliveryMethod.WEB); NotificationRule rule = new NotificationRule(); - rule.setName(triggerConfig.getTriggerType() + " [" + Arrays.toString(targets) + "]"); + rule.setName(triggerConfig.getTriggerType() + " " + Arrays.toString(targets)); rule.setTemplateId(template.getId()); rule.setTriggerType(triggerConfig.getTriggerType()); rule.setTriggerConfig(triggerConfig); diff --git a/application/src/test/java/org/thingsboard/server/service/notification/NotificationRuleApiTest.java b/application/src/test/java/org/thingsboard/server/service/notification/NotificationRuleApiTest.java index dae2d06075..b03a914d1f 100644 --- a/application/src/test/java/org/thingsboard/server/service/notification/NotificationRuleApiTest.java +++ b/application/src/test/java/org/thingsboard/server/service/notification/NotificationRuleApiTest.java @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.UpdateMessage; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmSearchStatus; @@ -55,6 +56,7 @@ import org.thingsboard.server.common.data.notification.rule.trigger.AlarmNotific import org.thingsboard.server.common.data.notification.rule.trigger.AlarmNotificationRuleTriggerConfig.AlarmAction; import org.thingsboard.server.common.data.notification.rule.trigger.EntitiesLimitNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.EntityActionNotificationRuleTriggerConfig; +import org.thingsboard.server.common.data.notification.rule.trigger.NewPlatformVersionNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType; import org.thingsboard.server.common.data.notification.targets.NotificationTarget; import org.thingsboard.server.common.data.notification.template.NotificationTemplate; @@ -66,9 +68,11 @@ import org.thingsboard.server.common.data.query.FilterPredicateValue; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.security.Authority; +import org.thingsboard.server.common.msg.notification.trigger.NewPlatformVersionTrigger; import org.thingsboard.server.dao.notification.NotificationRequestService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.queue.notification.NotificationRuleProcessor; import org.thingsboard.server.service.apiusage.limits.LimitedApi; import org.thingsboard.server.service.apiusage.limits.RateLimitService; import org.thingsboard.server.service.telemetry.AlarmSubscriptionService; @@ -102,6 +106,8 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest { private RateLimitService rateLimitService; @Autowired private RuleChainService ruleChainService; + @Autowired + private NotificationRuleProcessor notificationRuleProcessor; @Before public void beforeEach() throws Exception { @@ -433,6 +439,34 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest { assertThat(getWsClient().getLastCountUpdate().getTotalUnreadCount()).isEqualTo(notificationRequestsLimit); } + @Test + public void testNotificationsDeduplication() throws Exception { + loginSysAdmin(); + NewPlatformVersionNotificationRuleTriggerConfig triggerConfig = new NewPlatformVersionNotificationRuleTriggerConfig(); + createNotificationRule(triggerConfig, "Test", "Test", createNotificationTarget(tenantAdminUserId).getId()); + loginTenantAdmin(); + + assertThat(getMyNotifications(false, 100)).size().isZero(); + for (int i = 1; i <= 10; i++) { + notificationRuleProcessor.process(NewPlatformVersionTrigger.builder() + .updateInfo(new UpdateMessage(true, "test", "test", + "test", "test", "test")) + .build()); + TimeUnit.MILLISECONDS.sleep(300); + } + TimeUnit.SECONDS.sleep(5); + assertThat(getMyNotifications(false, 100)).size().isOne(); + + notificationRuleProcessor.process(NewPlatformVersionTrigger.builder() + .updateInfo(new UpdateMessage(true, "CHANGED", "test", + "test", "test", "test")) + .build()); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + assertThat(getMyNotifications(false, 100)).size().isEqualTo(2); + }); + } + private R checkNotificationAfter(Callable action, BiConsumer check) throws Exception { if (getWsClient().getLastDataUpdate() == null) { getWsClient().subscribeForUnreadNotifications(10).waitForReply(true); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java index 80d9c4a873..ff0032f435 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java @@ -30,6 +30,7 @@ public class CacheConstants { public static final String TENANTS_EXIST_CACHE = "tenantsExist"; public static final String DEVICE_PROFILE_CACHE = "deviceProfiles"; public static final String NOTIFICATION_SETTINGS_CACHE = "notificationSettings"; + public static final String SENT_NOTIFICATIONS_CACHE = "sentNotifications"; public static final String ASSET_PROFILE_CACHE = "assetProfiles"; public static final String ATTRIBUTES_CACHE = "attributes"; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/NotificationRuleTriggerType.java b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/NotificationRuleTriggerType.java index 34e42e768c..e79e7f1195 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/NotificationRuleTriggerType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/NotificationRuleTriggerType.java @@ -16,8 +16,10 @@ package org.thingsboard.server.common.data.notification.rule.trigger; import lombok.Getter; +import lombok.RequiredArgsConstructor; @Getter +@RequiredArgsConstructor public enum NotificationRuleTriggerType { ENTITY_ACTION, @@ -26,18 +28,15 @@ public enum NotificationRuleTriggerType { ALARM_ASSIGNMENT, DEVICE_ACTIVITY, RULE_ENGINE_COMPONENT_LIFECYCLE_EVENT, - NEW_PLATFORM_VERSION(false), - ENTITIES_LIMIT(false), - API_USAGE_LIMIT(false); + NEW_PLATFORM_VERSION(false, true), + ENTITIES_LIMIT(false, false), + API_USAGE_LIMIT(false, false); private final boolean tenantLevel; - - NotificationRuleTriggerType(boolean tenantLevel) { - this.tenantLevel = tenantLevel; - } + private final boolean deduplicate; NotificationRuleTriggerType() { - this(true); + this(true, false); } }