Deduplication for new platform version notification
This commit is contained in:
parent
a87561daa3
commit
a4281fd6d3
@ -15,13 +15,17 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.notification.rule;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cache.Cache;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.rule.engine.api.NotificationCenter;
|
||||
import org.thingsboard.server.common.data.CacheConstants;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.NotificationRequestId;
|
||||
@ -35,13 +39,13 @@ import org.thingsboard.server.common.data.notification.rule.NotificationRule;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerConfig;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.queue.notification.NotificationRuleProcessor;
|
||||
import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger;
|
||||
import org.thingsboard.server.common.msg.notification.trigger.RuleEngineMsgTrigger;
|
||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.dao.notification.NotificationRequestService;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.notification.NotificationRuleProcessor;
|
||||
import org.thingsboard.server.service.apiusage.limits.LimitedApi;
|
||||
import org.thingsboard.server.service.apiusage.limits.RateLimitService;
|
||||
import org.thingsboard.server.service.executors.NotificationExecutorService;
|
||||
@ -49,6 +53,8 @@ import org.thingsboard.server.service.notification.rule.cache.NotificationRulesC
|
||||
import org.thingsboard.server.service.notification.rule.trigger.NotificationRuleTriggerProcessor;
|
||||
import org.thingsboard.server.service.notification.rule.trigger.RuleEngineMsgNotificationRuleTriggerProcessor;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumMap;
|
||||
import java.util.HashMap;
|
||||
@ -71,9 +77,19 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess
|
||||
@Autowired @Lazy
|
||||
private NotificationCenter notificationCenter;
|
||||
private final NotificationExecutorService notificationExecutor;
|
||||
private final CacheManager cacheManager;
|
||||
private Cache sentNotifications;
|
||||
|
||||
private final Map<NotificationRuleTriggerType, NotificationRuleTriggerProcessor> triggerProcessors = new EnumMap<>(NotificationRuleTriggerType.class);
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
sentNotifications = cacheManager.getCache(CacheConstants.SENT_NOTIFICATIONS_CACHE);
|
||||
if (sentNotifications == null) {
|
||||
throw new IllegalStateException("Sent notifications cache is not set up");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(NotificationRuleTrigger trigger) {
|
||||
NotificationRuleTriggerType triggerType = trigger.getType();
|
||||
@ -126,6 +142,9 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess
|
||||
log.debug("[{}] Rate limit for notification requests per rule was exceeded (rule '{}')", rule.getTenantId(), rule.getName());
|
||||
return;
|
||||
}
|
||||
if (trigger.getType().isDeduplicate() && alreadySent(rule.getId(), trigger)) {
|
||||
return;
|
||||
}
|
||||
|
||||
NotificationInfo notificationInfo = constructNotificationInfo(trigger, triggerConfig);
|
||||
rule.getRecipientsConfig().getTargetsTable().forEach((delay, targets) -> {
|
||||
@ -175,6 +194,23 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess
|
||||
return triggerProcessors.get(triggerConfig.getTriggerType()).constructNotificationInfo(trigger);
|
||||
}
|
||||
|
||||
private boolean alreadySent(NotificationRuleId ruleId, NotificationRuleTrigger trigger) {
|
||||
String key = ruleId + "_" + trigger.getOriginatorEntityId();
|
||||
SentNotification sent = sentNotifications.get(key, SentNotification.class);
|
||||
boolean alreadySent;
|
||||
if (sent != null && sent.getTrigger().equals(trigger)) {
|
||||
alreadySent = true;
|
||||
log.debug("Notification for {} trigger was already sent, ignoring", trigger.getType());
|
||||
// updating cache anyway so that the value is not removed by ttl
|
||||
} else {
|
||||
alreadySent = false;
|
||||
sent = new SentNotification(trigger);
|
||||
}
|
||||
log.trace("[{}] Putting to sentNotifications cache: {}", ruleId, trigger);
|
||||
sentNotifications.put(key, sent);
|
||||
return alreadySent;
|
||||
}
|
||||
|
||||
@EventListener(ComponentLifecycleMsg.class)
|
||||
public void onNotificationRuleDeleted(ComponentLifecycleMsg componentLifecycleMsg) {
|
||||
if (componentLifecycleMsg.getEvent() != ComponentLifecycleEvent.DELETED ||
|
||||
@ -209,4 +245,11 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess
|
||||
RuleEngineMsgTrigger.msgTypeToTriggerType = ruleEngineMsgTypeToTriggerType;
|
||||
}
|
||||
|
||||
@Data
|
||||
private static class SentNotification implements Serializable {
|
||||
private static final long serialVersionUID = 38973480405095422L;
|
||||
|
||||
private final NotificationRuleTrigger trigger;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -18,26 +18,18 @@ package org.thingsboard.server.service.notification.rule.trigger;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.UpdateMessage;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.notification.info.NewPlatformVersionNotificationInfo;
|
||||
import org.thingsboard.server.common.data.notification.info.RuleOriginatedNotificationInfo;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NewPlatformVersionNotificationRuleTriggerConfig;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType;
|
||||
import org.thingsboard.server.common.msg.notification.trigger.NewPlatformVersionTrigger;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class NewPlatformVersionTriggerProcessor implements NotificationRuleTriggerProcessor<NewPlatformVersionTrigger, NewPlatformVersionNotificationRuleTriggerConfig> {
|
||||
|
||||
private final PartitionService partitionService;
|
||||
|
||||
@Override
|
||||
public boolean matchesFilter(NewPlatformVersionTrigger trigger, NewPlatformVersionNotificationRuleTriggerConfig triggerConfig) {
|
||||
if (!partitionService.isMyPartition(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID)) {
|
||||
return false;
|
||||
}
|
||||
return trigger.getUpdateInfo().isUpdateAvailable();
|
||||
}
|
||||
|
||||
|
||||
@ -445,6 +445,9 @@ cache:
|
||||
notificationSettings:
|
||||
timeToLiveInMinutes: "${CACHE_SPECS_NOTIFICATION_SETTINGS_TTL:10}"
|
||||
maxSize: "${CACHE_SPECS_NOTIFICATION_SETTINGS_MAX_SIZE:1000}"
|
||||
sentNotifications:
|
||||
timeToLiveInMinutes: "${CACHE_SPECS_SENT_NOTIFICATIONS_TTL:1440}"
|
||||
maxSize: "${CACHE_SPECS_SENT_NOTIFICATIONS_MAX_SIZE:10000}"
|
||||
attributes:
|
||||
timeToLiveInMinutes: "${CACHE_SPECS_ATTRIBUTES_TTL:1440}"
|
||||
maxSize: "${CACHE_SPECS_ATTRIBUTES_MAX_SIZE:100000}"
|
||||
|
||||
@ -204,7 +204,7 @@ public abstract class AbstractNotificationApiTest extends AbstractControllerTest
|
||||
NotificationTemplate template = createNotificationTemplate(NotificationType.valueOf(triggerConfig.getTriggerType().toString()), subject, text, NotificationDeliveryMethod.WEB);
|
||||
|
||||
NotificationRule rule = new NotificationRule();
|
||||
rule.setName(triggerConfig.getTriggerType() + " [" + Arrays.toString(targets) + "]");
|
||||
rule.setName(triggerConfig.getTriggerType() + " " + Arrays.toString(targets));
|
||||
rule.setTemplateId(template.getId());
|
||||
rule.setTriggerType(triggerConfig.getTriggerType());
|
||||
rule.setTriggerConfig(triggerConfig);
|
||||
|
||||
@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.UpdateMessage;
|
||||
import org.thingsboard.server.common.data.User;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmSearchStatus;
|
||||
@ -55,6 +56,7 @@ import org.thingsboard.server.common.data.notification.rule.trigger.AlarmNotific
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.AlarmNotificationRuleTriggerConfig.AlarmAction;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.EntitiesLimitNotificationRuleTriggerConfig;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.EntityActionNotificationRuleTriggerConfig;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NewPlatformVersionNotificationRuleTriggerConfig;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType;
|
||||
import org.thingsboard.server.common.data.notification.targets.NotificationTarget;
|
||||
import org.thingsboard.server.common.data.notification.template.NotificationTemplate;
|
||||
@ -66,9 +68,11 @@ import org.thingsboard.server.common.data.query.FilterPredicateValue;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
|
||||
import org.thingsboard.server.common.data.security.Authority;
|
||||
import org.thingsboard.server.common.msg.notification.trigger.NewPlatformVersionTrigger;
|
||||
import org.thingsboard.server.dao.notification.NotificationRequestService;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.queue.notification.NotificationRuleProcessor;
|
||||
import org.thingsboard.server.service.apiusage.limits.LimitedApi;
|
||||
import org.thingsboard.server.service.apiusage.limits.RateLimitService;
|
||||
import org.thingsboard.server.service.telemetry.AlarmSubscriptionService;
|
||||
@ -102,6 +106,8 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
|
||||
private RateLimitService rateLimitService;
|
||||
@Autowired
|
||||
private RuleChainService ruleChainService;
|
||||
@Autowired
|
||||
private NotificationRuleProcessor notificationRuleProcessor;
|
||||
|
||||
@Before
|
||||
public void beforeEach() throws Exception {
|
||||
@ -433,6 +439,34 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
|
||||
assertThat(getWsClient().getLastCountUpdate().getTotalUnreadCount()).isEqualTo(notificationRequestsLimit);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotificationsDeduplication() throws Exception {
|
||||
loginSysAdmin();
|
||||
NewPlatformVersionNotificationRuleTriggerConfig triggerConfig = new NewPlatformVersionNotificationRuleTriggerConfig();
|
||||
createNotificationRule(triggerConfig, "Test", "Test", createNotificationTarget(tenantAdminUserId).getId());
|
||||
loginTenantAdmin();
|
||||
|
||||
assertThat(getMyNotifications(false, 100)).size().isZero();
|
||||
for (int i = 1; i <= 10; i++) {
|
||||
notificationRuleProcessor.process(NewPlatformVersionTrigger.builder()
|
||||
.updateInfo(new UpdateMessage(true, "test", "test",
|
||||
"test", "test", "test"))
|
||||
.build());
|
||||
TimeUnit.MILLISECONDS.sleep(300);
|
||||
}
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
assertThat(getMyNotifications(false, 100)).size().isOne();
|
||||
|
||||
notificationRuleProcessor.process(NewPlatformVersionTrigger.builder()
|
||||
.updateInfo(new UpdateMessage(true, "CHANGED", "test",
|
||||
"test", "test", "test"))
|
||||
.build());
|
||||
await().atMost(5, TimeUnit.SECONDS)
|
||||
.untilAsserted(() -> {
|
||||
assertThat(getMyNotifications(false, 100)).size().isEqualTo(2);
|
||||
});
|
||||
}
|
||||
|
||||
private <R> R checkNotificationAfter(Callable<R> action, BiConsumer<Notification, R> check) throws Exception {
|
||||
if (getWsClient().getLastDataUpdate() == null) {
|
||||
getWsClient().subscribeForUnreadNotifications(10).waitForReply(true);
|
||||
|
||||
@ -30,6 +30,7 @@ public class CacheConstants {
|
||||
public static final String TENANTS_EXIST_CACHE = "tenantsExist";
|
||||
public static final String DEVICE_PROFILE_CACHE = "deviceProfiles";
|
||||
public static final String NOTIFICATION_SETTINGS_CACHE = "notificationSettings";
|
||||
public static final String SENT_NOTIFICATIONS_CACHE = "sentNotifications";
|
||||
|
||||
public static final String ASSET_PROFILE_CACHE = "assetProfiles";
|
||||
public static final String ATTRIBUTES_CACHE = "attributes";
|
||||
|
||||
@ -16,8 +16,10 @@
|
||||
package org.thingsboard.server.common.data.notification.rule.trigger;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
public enum NotificationRuleTriggerType {
|
||||
|
||||
ENTITY_ACTION,
|
||||
@ -26,18 +28,15 @@ public enum NotificationRuleTriggerType {
|
||||
ALARM_ASSIGNMENT,
|
||||
DEVICE_ACTIVITY,
|
||||
RULE_ENGINE_COMPONENT_LIFECYCLE_EVENT,
|
||||
NEW_PLATFORM_VERSION(false),
|
||||
ENTITIES_LIMIT(false),
|
||||
API_USAGE_LIMIT(false);
|
||||
NEW_PLATFORM_VERSION(false, true),
|
||||
ENTITIES_LIMIT(false, false),
|
||||
API_USAGE_LIMIT(false, false);
|
||||
|
||||
private final boolean tenantLevel;
|
||||
|
||||
NotificationRuleTriggerType(boolean tenantLevel) {
|
||||
this.tenantLevel = tenantLevel;
|
||||
}
|
||||
private final boolean deduplicate;
|
||||
|
||||
NotificationRuleTriggerType() {
|
||||
this(true);
|
||||
this(true, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user