Merge pull request #13534 from AndriiLandiak/fix/resources-shortage-cluster

Improve processing strategy. Fix resources shortage
This commit is contained in:
Viacheslav Klimov 2025-06-06 15:21:09 +03:00 committed by GitHub
commit 4d31138b53
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 140 additions and 37 deletions

View File

@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.notification.NotificationRequestStatus
import org.thingsboard.server.common.data.notification.info.NotificationInfo;
import org.thingsboard.server.common.data.notification.rule.NotificationRule;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger.DeduplicationStrategy;
import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerConfig;
import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
@ -66,8 +67,8 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess
private final NotificationDeduplicationService deduplicationService;
private final PartitionService partitionService;
private final RateLimitService rateLimitService;
@Autowired @Lazy
private NotificationCenter notificationCenter;
@Lazy
private final NotificationCenter notificationCenter;
private final NotificationExecutorService notificationExecutor;
private final Map<NotificationRuleTriggerType, NotificationRuleTriggerProcessor> triggerProcessors = new EnumMap<>(NotificationRuleTriggerType.class);
@ -82,14 +83,11 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess
if (enabledRules.isEmpty()) {
return;
}
if (trigger.deduplicate()) {
enabledRules = new ArrayList<>(enabledRules);
enabledRules.removeIf(rule -> deduplicationService.alreadyProcessed(trigger, rule));
}
final List<NotificationRule> rules = enabledRules;
for (NotificationRule rule : rules) {
List<NotificationRule> rulesToProcess = filterNotificationRules(trigger, enabledRules);
for (NotificationRule rule : rulesToProcess) {
try {
processNotificationRule(rule, trigger);
processNotificationRule(rule, trigger, DeduplicationStrategy.ONLY_MATCHING.equals(trigger.getDeduplicationStrategy()));
} catch (Throwable e) {
log.error("Failed to process notification rule {} for trigger type {} with trigger object {}", rule.getId(), rule.getTriggerType(), trigger, e);
}
@ -100,7 +98,20 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess
});
}
private void processNotificationRule(NotificationRule rule, NotificationRuleTrigger trigger) {
private List<NotificationRule> filterNotificationRules(NotificationRuleTrigger trigger, List<NotificationRule> enabledRules) {
List<NotificationRule> rulesToProcess = new ArrayList<>(enabledRules);
rulesToProcess.removeIf(rule -> switch (trigger.getDeduplicationStrategy()) {
case ONLY_MATCHING -> {
boolean matched = matchesFilter(trigger, rule.getTriggerConfig());
yield !matched || deduplicationService.alreadyProcessed(trigger, rule);
}
case ALL -> deduplicationService.alreadyProcessed(trigger, rule);
default -> false;
});
return rulesToProcess;
}
private void processNotificationRule(NotificationRule rule, NotificationRuleTrigger trigger, boolean alreadyMatched) {
NotificationRuleTriggerConfig triggerConfig = rule.getTriggerConfig();
log.debug("Processing notification rule '{}' for trigger type {}", rule.getName(), rule.getTriggerType());
@ -114,7 +125,7 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess
return;
}
if (matchesFilter(trigger, triggerConfig)) {
if (alreadyMatched || matchesFilter(trigger, triggerConfig)) {
if (!rateLimitService.checkRateLimit(LimitedApi.NOTIFICATION_REQUESTS_PER_RULE, rule.getTenantId(), rule.getId())) {
log.debug("[{}] Rate limit for notification requests per rule was exceeded (rule '{}')", rule.getTenantId(), rule.getName());
return;

View File

@ -39,7 +39,12 @@ public class ResourcesShortageTriggerProcessor implements NotificationRuleTrigge
@Override
public RuleOriginatedNotificationInfo constructNotificationInfo(ResourcesShortageTrigger trigger) {
return ResourcesShortageNotificationInfo.builder().resource(trigger.getResource().name()).usage(trigger.getUsage()).build();
return ResourcesShortageNotificationInfo.builder()
.resource(trigger.getResource().name())
.usage(trigger.getUsage())
.serviceId(trigger.getServiceId())
.serviceType(trigger.getServiceType())
.build();
}
@Override

View File

@ -185,9 +185,14 @@ public class DefaultSystemInfoService extends TbApplicationEventListener<Partiti
long ts = System.currentTimeMillis();
List<SystemInfoData> clusterSystemData = getSystemData(serviceInfoProvider.getServiceInfo());
clusterSystemData.forEach(data -> {
notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.CPU).usage(data.getCpuUsage()).build());
notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.RAM).usage(data.getMemoryUsage()).build());
notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.STORAGE).usage(data.getDiscUsage()).build());
Arrays.stream(Resource.values()).forEach(resource -> {
notificationRuleProcessor.process(ResourcesShortageTrigger.builder()
.resource(resource)
.serviceId(data.getServiceId())
.serviceType(data.getServiceType())
.usage(extractResourceUsage(data, resource))
.build());
});
});
BasicTsKvEntry clusterDataKv = new BasicTsKvEntry(ts, new JsonDataEntry("clusterSystemData", JacksonUtil.toString(clusterSystemData)));
doSave(Arrays.asList(new BasicTsKvEntry(ts, new BooleanDataEntry("clusterMode", true)), clusterDataKv));
@ -200,17 +205,17 @@ public class DefaultSystemInfoService extends TbApplicationEventListener<Partiti
getCpuUsage().ifPresent(v -> {
long value = (long) v;
tsList.add(new BasicTsKvEntry(ts, new LongDataEntry("cpuUsage", value)));
notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.CPU).usage(value).build());
notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.CPU).usage(value).serviceId(serviceInfoProvider.getServiceId()).serviceType(serviceInfoProvider.getServiceType()).build());
});
getMemoryUsage().ifPresent(v -> {
long value = (long) v;
tsList.add(new BasicTsKvEntry(ts, new LongDataEntry("memoryUsage", value)));
notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.RAM).usage(value).build());
notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.RAM).usage(value).serviceId(serviceInfoProvider.getServiceId()).serviceType(serviceInfoProvider.getServiceType()).build());
});
getDiscSpaceUsage().ifPresent(v -> {
long value = (long) v;
tsList.add(new BasicTsKvEntry(ts, new LongDataEntry("discUsage", value)));
notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.STORAGE).usage(value).build());
notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.STORAGE).usage(value).serviceId(serviceInfoProvider.getServiceId()).serviceType(serviceInfoProvider.getServiceType()).build());
});
getCpuCount().ifPresent(v -> tsList.add(new BasicTsKvEntry(ts, new LongDataEntry("cpuCount", (long) v))));
@ -258,6 +263,14 @@ public class DefaultSystemInfoService extends TbApplicationEventListener<Partiti
return infoData;
}
private Long extractResourceUsage(SystemInfoData info, Resource resource) {
return switch (resource) {
case CPU -> info.getCpuUsage();
case RAM -> info.getMemoryUsage();
case STORAGE -> info.getDiscUsage();
};
}
@PreDestroy
private void destroy() {
if (scheduler != null) {

View File

@ -825,6 +825,8 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
notificationRuleProcessor.process(ResourcesShortageTrigger.builder()
.resource(Resource.RAM)
.usage(15L)
.serviceType("serviceType")
.serviceId("serviceId")
.build());
TimeUnit.MILLISECONDS.sleep(300);
}
@ -837,10 +839,48 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
notificationRuleProcessor.process(ResourcesShortageTrigger.builder()
.resource(Resource.RAM)
.usage(5L)
.serviceType("serviceType")
.serviceId("serviceId")
.build());
await("").atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertThat(getMyNotifications(false, 100)).size().isOne());
}
@Test
public void testNotificationsResourcesShortage_whenThresholdChangeToMatchingFilter_thenSendNotification() throws Exception {
loginSysAdmin();
ResourcesShortageNotificationRuleTriggerConfig triggerConfig = ResourcesShortageNotificationRuleTriggerConfig.builder()
.ramThreshold(1f)
.cpuThreshold(1f)
.storageThreshold(1f)
.build();
NotificationRule rule = createNotificationRule(triggerConfig, "Warning: ${resource} shortage", "${resource} shortage", createNotificationTarget(tenantAdminUserId).getId());
loginTenantAdmin();
Method method = DefaultSystemInfoService.class.getDeclaredMethod("saveCurrentMonolithSystemInfo");
method.setAccessible(true);
method.invoke(systemInfoService);
TimeUnit.SECONDS.sleep(5);
assertThat(getMyNotifications(false, 100)).size().isZero();
loginSysAdmin();
triggerConfig = ResourcesShortageNotificationRuleTriggerConfig.builder()
.ramThreshold(0.01f)
.cpuThreshold(1f)
.storageThreshold(1f)
.build();
rule.setTriggerConfig(triggerConfig);
saveNotificationRule(rule);
loginTenantAdmin();
method.invoke(systemInfoService);
await().atMost(10, TimeUnit.SECONDS).until(() -> getMyNotifications(false, 100).size() == 1);
Notification notification = getMyNotifications(false, 100).get(0);
assertThat(notification.getSubject()).isEqualTo("Warning: RAM shortage");
assertThat(notification.getText()).isEqualTo("RAM shortage");
}
@Test
public void testNotificationRuleDisabling() throws Exception {
EntityActionNotificationRuleTriggerConfig triggerConfig = new EntityActionNotificationRuleTriggerConfig();

View File

@ -20,6 +20,7 @@ import lombok.Data;
@Data
public class SystemInfoData {
@Schema(description = "Service Id.")
private String serviceId;
@Schema(description = "Service type.")

View File

@ -30,12 +30,16 @@ public class ResourcesShortageNotificationInfo implements RuleOriginatedNotifica
private String resource;
private Long usage;
private String serviceId;
private String serviceType;
@Override
public Map<String, String> getTemplateData() {
return Map.of(
"resource", resource,
"usage", String.valueOf(usage)
"usage", String.valueOf(usage),
"serviceId", serviceId,
"serviceType", serviceType
);
}

View File

@ -23,12 +23,16 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType;
import java.io.Serial;
import java.util.concurrent.TimeUnit;
@Data
@Builder
public class EdgeCommunicationFailureTrigger implements NotificationRuleTrigger {
@Serial
private static final long serialVersionUID = 2918443863787603524L;
private final TenantId tenantId;
private final CustomerId customerId;
private final EdgeId edgeId;
@ -37,8 +41,8 @@ public class EdgeCommunicationFailureTrigger implements NotificationRuleTrigger
private final String error;
@Override
public boolean deduplicate() {
return true;
public DeduplicationStrategy getDeduplicationStrategy() {
return DeduplicationStrategy.ALL;
}
@Override
@ -60,4 +64,5 @@ public class EdgeCommunicationFailureTrigger implements NotificationRuleTrigger
public EntityId getOriginatorEntityId() {
return edgeId;
}
}

View File

@ -23,12 +23,16 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType;
import java.io.Serial;
import java.util.concurrent.TimeUnit;
@Data
@Builder
public class EdgeConnectionTrigger implements NotificationRuleTrigger {
@Serial
private static final long serialVersionUID = -261939829962721957L;
private final TenantId tenantId;
private final CustomerId customerId;
private final EdgeId edgeId;
@ -36,8 +40,8 @@ public class EdgeConnectionTrigger implements NotificationRuleTrigger {
private final String edgeName;
@Override
public boolean deduplicate() {
return true;
public DeduplicationStrategy getDeduplicationStrategy() {
return DeduplicationStrategy.ALL;
}
@Override
@ -59,4 +63,5 @@ public class EdgeConnectionTrigger implements NotificationRuleTrigger {
public EntityId getOriginatorEntityId() {
return edgeId;
}
}

View File

@ -22,10 +22,15 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType;
import java.io.Serial;
@Data
@Builder
public class NewPlatformVersionTrigger implements NotificationRuleTrigger {
@Serial
private static final long serialVersionUID = 3298785969736390092L;
private final UpdateMessage updateInfo;
@Override
@ -45,8 +50,8 @@ public class NewPlatformVersionTrigger implements NotificationRuleTrigger {
@Override
public boolean deduplicate() {
return true;
public DeduplicationStrategy getDeduplicationStrategy() {
return DeduplicationStrategy.ALL;
}
@Override

View File

@ -29,9 +29,8 @@ public interface NotificationRuleTrigger extends Serializable {
EntityId getOriginatorEntityId();
default boolean deduplicate() {
return false;
default DeduplicationStrategy getDeduplicationStrategy() {
return DeduplicationStrategy.NONE;
}
default String getDeduplicationKey() {
@ -43,4 +42,10 @@ public interface NotificationRuleTrigger extends Serializable {
return 0;
}
enum DeduplicationStrategy {
NONE,
ALL,
ONLY_MATCHING
}
}

View File

@ -22,12 +22,16 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.limit.LimitedApi;
import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType;
import java.io.Serial;
import java.util.concurrent.TimeUnit;
@Data
@Builder
public class RateLimitsTrigger implements NotificationRuleTrigger {
@Serial
private static final long serialVersionUID = -4423112145409424886L;
private final TenantId tenantId;
private final LimitedApi api;
private final EntityId limitLevel;
@ -45,8 +49,8 @@ public class RateLimitsTrigger implements NotificationRuleTrigger {
@Override
public boolean deduplicate() {
return true;
public DeduplicationStrategy getDeduplicationStrategy() {
return DeduplicationStrategy.ALL;
}
@Override

View File

@ -33,6 +33,8 @@ public class ResourcesShortageTrigger implements NotificationRuleTrigger {
private Resource resource;
private Long usage;
private String serviceId;
private String serviceType;
@Override
public TenantId getTenantId() {
@ -45,13 +47,13 @@ public class ResourcesShortageTrigger implements NotificationRuleTrigger {
}
@Override
public boolean deduplicate() {
return true;
public DeduplicationStrategy getDeduplicationStrategy() {
return DeduplicationStrategy.ONLY_MATCHING;
}
@Override
public String getDeduplicationKey() {
return resource.name();
return String.join(":", resource.name(), serviceId, serviceType);
}
@Override

View File

@ -22,6 +22,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.JavaSerDesUtil;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger.DeduplicationStrategy;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
@ -47,7 +48,7 @@ public class RemoteNotificationRuleProcessor implements NotificationRuleProcesso
@Override
public void process(NotificationRuleTrigger trigger) {
try {
if (trigger.deduplicate() && deduplicationService.alreadyProcessed(trigger)) {
if (!DeduplicationStrategy.NONE.equals(trigger.getDeduplicationStrategy()) && deduplicationService.alreadyProcessed(trigger)) {
return;
}

View File

@ -376,7 +376,7 @@ public class DefaultNotifications {
public static final DefaultNotification resourcesShortage = DefaultNotification.builder()
.name("Resources shortage notification")
.type(NotificationType.RESOURCES_SHORTAGE)
.subject("Warning: ${resource} shortage")
.subject("Warning: ${resource} shortage for ${serviceId}")
.text("${resource} usage is at ${usage}%.")
.icon("warning")
.rule(DefaultRule.builder()

View File

@ -9,8 +9,10 @@ See the available types and parameters below:
Available template parameters:
* `resource` - the resource name;
* `usage` - the resource usage value;
* `resource` - the resource name (e.g., "CPU", "RAM", "STORAGE");
* `usage` - the current usage value of the resource;
* `serviceId` - the service id (convenient in cluster setup);
* `serviceType` - the service type (convenient in cluster setup);
Parameter names must be wrapped using `${...}`. For example: `${resource}`.
You may also modify the value of the parameter with one of the suffixes: