Refactoring for alarms clean up; fix invalid total removed counting

This commit is contained in:
ViacheslavKlimov 2023-04-26 13:41:02 +03:00
parent abd843b6f5
commit 6e8ba8af13
4 changed files with 198 additions and 69 deletions

View File

@ -21,11 +21,11 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.msg.queue.ServiceType;
@ -61,46 +61,48 @@ public class AlarmsCleanUpService {
@Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.alarms.checking_interval})}", fixedDelayString = "${sql.ttl.alarms.checking_interval}")
public void cleanUp() {
PageLink tenantsBatchRequest = new PageLink(10_000, 0);
PageLink removalBatchRequest = new PageLink(removalBatchSize, 0 );
PageData<TenantId> tenantsIds;
do {
tenantsIds = tenantService.findTenantsIds(tenantsBatchRequest);
for (TenantId tenantId : tenantsIds.getData()) {
if (!partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) {
continue;
}
PageDataIterable<TenantId> tenants = new PageDataIterable<>(tenantService::findTenantsIds, 10_000);
for (TenantId tenantId : tenants) {
try {
cleanUp(tenantId);
} catch (Exception e) {
log.warn("Failed to clean up alarms by ttl for tenant {}", tenantId, e);
}
}
}
Optional<DefaultTenantProfileConfiguration> tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration();
if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getAlarmsTtlDays() == 0) {
continue;
}
private void cleanUp(TenantId tenantId) {
if (!partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) {
return;
}
long ttl = TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getAlarmsTtlDays());
long expirationTime = System.currentTimeMillis() - ttl;
Optional<DefaultTenantProfileConfiguration> tenantProfileConfiguration = tenantProfileCache.get(tenantId).getProfileConfiguration();
if (tenantProfileConfiguration.isEmpty() || tenantProfileConfiguration.get().getAlarmsTtlDays() == 0) {
return;
}
long totalRemoved = 0;
while (true) {
PageData<AlarmId> toRemove = alarmDao.findAlarmsIdsByEndTsBeforeAndTenantId(expirationTime, tenantId, removalBatchRequest);
toRemove.getData().forEach(alarmId -> {
relationService.deleteEntityRelations(tenantId, alarmId);
Alarm alarm = alarmService.deleteAlarm(tenantId, alarmId).getAlarm();
entityActionService.pushEntityActionToRuleEngine(alarm.getOriginator(), alarm, tenantId, null, ActionType.ALARM_DELETE, null);
});
long ttl = TimeUnit.DAYS.toMillis(tenantProfileConfiguration.get().getAlarmsTtlDays());
long expirationTime = System.currentTimeMillis() - ttl;
totalRemoved += toRemove.getTotalElements();
if (!toRemove.hasNext()) {
break;
}
}
if (totalRemoved > 0) {
log.info("Removed {} outdated alarm(s) for tenant {} older than {}", totalRemoved, tenantId, new Date(expirationTime));
PageLink removalBatchRequest = new PageLink(removalBatchSize, 0);
long totalRemoved = 0;
while (true) {
PageData<AlarmId> toRemove = alarmDao.findAlarmsIdsByEndTsBeforeAndTenantId(expirationTime, tenantId, removalBatchRequest);
for (AlarmId alarmId : toRemove.getData()) {
relationService.deleteEntityRelations(tenantId, alarmId);
Alarm alarm = alarmService.delAlarm(tenantId, alarmId).getAlarm();
if (alarm != null) {
entityActionService.pushEntityActionToRuleEngine(alarm.getOriginator(), alarm, tenantId, null, ActionType.ALARM_DELETE, null);
totalRemoved++;
}
}
tenantsBatchRequest = tenantsBatchRequest.nextPageLink();
} while (tenantsIds.hasNext());
if (!toRemove.hasNext()) {
break;
}
}
if (totalRemoved > 0) {
log.info("Removed {} outdated alarm(s) for tenant {} older than {}", totalRemoved, tenantId, new Date(expirationTime));
}
}
}

View File

@ -74,6 +74,7 @@ import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration;
@ -103,16 +104,20 @@ import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.msg.session.FeatureType;
import org.thingsboard.server.config.ThingsboardSecurityConfiguration;
import org.thingsboard.server.dao.Dao;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.service.entitiy.tenant.profile.TbTenantProfileService;
import org.thingsboard.server.service.security.auth.jwt.RefreshTokenRequest;
import org.thingsboard.server.service.security.auth.rest.LoginRequest;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
@ -123,6 +128,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
@ -202,6 +208,9 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
@Autowired
private TenantProfileService tenantProfileService;
@Autowired
private TbTenantProfileService tbTenantProfileService;
@Autowired
public TimeseriesService tsService;
@ -903,18 +912,27 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
throw new AssertionError("Unexpected status " + mvcResult.getResponse().getStatus());
}
protected <T> T getFieldValue(Object target, String fieldName) throws Exception {
protected static <T> T getFieldValue(Object target, String fieldName) throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return (T) field.get(target);
}
protected void setStaticFieldValue(Class<?> targetCls, String fieldName, Object value) throws Exception {
protected static void setStaticFieldValue(Class<?> targetCls, String fieldName, Object value) throws Exception {
Field field = targetCls.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(null, value);
}
protected static void setStaticFinalFieldValue(Class<?> targetCls, String fieldName, Object value) throws Exception {
Field field = targetCls.getDeclaredField(fieldName);
field.setAccessible(true);
Field modifiers = Field.class.getDeclaredField("modifiers");
modifiers.setAccessible(true);
modifiers.setInt(field, field.getModifiers() & ~Modifier.FINAL);
field.set(null, value);
}
protected int getDeviceActorSubscriptionCount(DeviceId deviceId, FeatureType featureType) {
DeviceActorMessageProcessor processor = getDeviceActorProcessor(deviceId);
Map<UUID, SessionInfo> subscriptions = (Map<UUID, SessionInfo>) ReflectionTestUtils.getField(processor, getMapName(featureType));
@ -951,4 +969,13 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
return (DeviceActorMessageProcessor) ReflectionTestUtils.getField(actor, "processor");
}
protected void updateDefaultTenantProfile(Consumer<DefaultTenantProfileConfiguration> updater) throws ThingsboardException {
TenantProfile tenantProfile = tenantProfileService.findDefaultTenantProfile(TenantId.SYS_TENANT_ID);
TenantProfileData profileData = tenantProfile.getProfileData();
DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) profileData.getConfiguration();
updater.accept(profileConfiguration);
tenantProfile.setProfileData(profileData);
tbTenantProfileService.save(TenantId.SYS_TENANT_ID, tenantProfile, null);
}
}

View File

@ -28,7 +28,6 @@ import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmSearchStatus;
@ -42,7 +41,6 @@ import org.thingsboard.server.common.data.device.profile.AlarmConditionKeyType;
import org.thingsboard.server.common.data.device.profile.AlarmRule;
import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
import org.thingsboard.server.common.data.device.profile.SimpleAlarmConditionSpec;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.Notification;
import org.thingsboard.server.common.data.notification.NotificationDeliveryMethod;
import org.thingsboard.server.common.data.notification.NotificationRequest;
@ -68,15 +66,11 @@ import org.thingsboard.server.common.data.query.FilterPredicateValue;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.dao.notification.NotificationRequestService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.service.apiusage.limits.LimitedApi;
import org.thingsboard.server.service.apiusage.limits.RateLimitService;
import org.thingsboard.server.service.entitiy.tenant.profile.TbTenantProfileService;
import org.thingsboard.server.service.telemetry.AlarmSubscriptionService;
import java.util.ArrayList;
@ -105,10 +99,6 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
@Autowired
private NotificationRequestService notificationRequestService;
@Autowired
private TenantProfileService tenantProfileService;
@Autowired
private TbTenantProfileService tbTenantProfileService;
@Autowired
private RateLimitService rateLimitService;
@Autowired
private RuleChainService ruleChainService;
@ -195,12 +185,12 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
.set("bool", BooleanNode.TRUE);
doPost("/api/plugins/telemetry/" + device.getId() + "/" + DataConstants.SHARED_SCOPE, attr);
await().atMost(2, TimeUnit.SECONDS)
await().atMost(10, TimeUnit.SECONDS)
.until(() -> alarmSubscriptionService.findLatestByOriginatorAndType(tenantId, device.getId(), alarmType).get() != null);
Alarm alarm = alarmSubscriptionService.findLatestByOriginatorAndType(tenantId, device.getId(), alarmType).get();
long ts = System.currentTimeMillis();
await().atMost(7, TimeUnit.SECONDS)
await().atMost(15, TimeUnit.SECONDS)
.until(() -> clients.values().stream().allMatch(client -> client.getLastDataUpdate() != null));
clients.forEach((expectedDelay, wsClient) -> {
Notification notification = wsClient.getLastDataUpdate().getUpdate();
@ -277,7 +267,7 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
.set("bool", BooleanNode.TRUE);
doPost("/api/plugins/telemetry/" + device.getId() + "/" + DataConstants.SHARED_SCOPE, attr);
await().atMost(2, TimeUnit.SECONDS)
await().atMost(10, TimeUnit.SECONDS)
.until(() -> alarmSubscriptionService.findLatestByOriginatorAndType(tenantId, device.getId(), alarmType).get() != null);
Alarm alarm = alarmSubscriptionService.findLatestByOriginatorAndType(tenantId, device.getId(), alarmType).get();
getWsClient().waitForUpdate(true);
@ -287,7 +277,7 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
assertThat(notification.getInfo()).asInstanceOf(type(AlarmNotificationInfo.class))
.extracting(AlarmNotificationInfo::getAlarmId).isEqualTo(alarm.getUuidId());
await().atMost(2, TimeUnit.SECONDS).until(() -> findNotificationRequests(EntityType.ALARM).getTotalElements() == escalationTable.size());
await().atMost(10, TimeUnit.SECONDS).until(() -> findNotificationRequests(EntityType.ALARM).getTotalElements() == escalationTable.size());
NotificationRequestInfo scheduledNotificationRequest = findNotificationRequests(EntityType.ALARM).getData().stream()
.filter(NotificationRequest::isScheduled)
.findFirst().orElse(null);
@ -304,18 +294,15 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
@Test
public void testNotificationRuleProcessing_entitiesLimit() throws Exception {
TenantProfile tenantProfile = tenantProfileService.findDefaultTenantProfile(TenantId.SYS_TENANT_ID);
TenantProfileData profileData = tenantProfile.getProfileData();
DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) profileData.getConfiguration();
int limit = 5;
profileConfiguration.setMaxDevices(limit);
profileConfiguration.setMaxAssets(limit);
profileConfiguration.setMaxCustomers(limit);
profileConfiguration.setMaxUsers(limit);
profileConfiguration.setMaxDashboards(limit);
profileConfiguration.setMaxRuleChains(limit);
tenantProfile.setProfileData(profileData);
tbTenantProfileService.save(TenantId.SYS_TENANT_ID, tenantProfile, null);
updateDefaultTenantProfile(profileConfiguration -> {
profileConfiguration.setMaxDevices(limit);
profileConfiguration.setMaxAssets(limit);
profileConfiguration.setMaxCustomers(limit);
profileConfiguration.setMaxUsers(limit);
profileConfiguration.setMaxDashboards(limit);
profileConfiguration.setMaxRuleChains(limit);
});
EntitiesLimitNotificationRuleTriggerConfig triggerConfig = EntitiesLimitNotificationRuleTriggerConfig.builder()
.entityTypes(null).threshold(0.8f)
@ -403,12 +390,9 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
@Test
public void testNotificationRequestsPerRuleRateLimits() throws Exception {
int notificationRequestsLimit = 10;
TenantProfile tenantProfile = tenantProfileService.findDefaultTenantProfile(TenantId.SYS_TENANT_ID);
TenantProfileData profileData = tenantProfile.getProfileData();
DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) profileData.getConfiguration();
profileConfiguration.setTenantNotificationRequestsPerRuleRateLimit(notificationRequestsLimit + ":300");
tenantProfile.setProfileData(profileData);
tbTenantProfileService.save(TenantId.SYS_TENANT_ID, tenantProfile, null);
updateDefaultTenantProfile(profileConfiguration -> {
profileConfiguration.setTenantNotificationRequestsPerRuleRateLimit(notificationRequestsLimit + ":300");
});
NotificationRule rule = new NotificationRule();
rule.setName("Device created");
@ -431,7 +415,7 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
String name = "device " + i;
createDevice(name, name);
}
await().atMost(30, TimeUnit.SECONDS)
await().atMost(40, TimeUnit.SECONDS)
.untilAsserted(() -> {
assertThat(getMyNotifications(false, 100)).size().isEqualTo(notificationRequestsLimit);
});

View File

@ -0,0 +1,116 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ttl;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.alarm.AlarmDao;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.service.DaoSqlTest;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@DaoSqlTest
@TestPropertySource(properties = {
"sql.ttl.alarms.removal_batch_size=5"
})
public class AlarmsCleanUpServiceTest extends AbstractControllerTest {
@Autowired
private AlarmsCleanUpService alarmsCleanUpService;
@SpyBean
private AlarmService alarmService;
@Autowired
private AlarmDao alarmDao;
private static Logger cleanUpServiceLogger;
@BeforeClass
public static void before() throws Exception {
cleanUpServiceLogger = Mockito.spy(LoggerFactory.getLogger(AlarmsCleanUpService.class));
setStaticFinalFieldValue(AlarmsCleanUpService.class, "log", cleanUpServiceLogger);
}
@Test
public void testAlarmsCleanUp() throws Exception {
int ttlDays = 1;
updateDefaultTenantProfile(profileConfiguration -> {
profileConfiguration.setAlarmsTtlDays(ttlDays);
});
loginTenantAdmin();
Device device = createDevice("device_0", "device_0");
int count = 100;
long ts = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(ttlDays) - TimeUnit.MINUTES.toMillis(10);
List<AlarmId> outdatedAlarms = new ArrayList<>();
List<AlarmId> freshAlarms = new ArrayList<>();
for (int i = 0; i < count; i++) {
Alarm alarm = Alarm.builder()
.tenantId(tenantId)
.originator(device.getId())
.cleared(false)
.acknowledged(false)
.severity(AlarmSeverity.CRITICAL)
.type("outdated_alarm_" + i)
.startTs(ts)
.endTs(ts)
.build();
alarm.setId(new AlarmId(UUID.randomUUID()));
alarm.setCreatedTime(ts);
outdatedAlarms.add(alarmDao.save(tenantId, alarm).getId());
alarm.setType("fresh_alarm_" + i);
alarm.setStartTs(System.currentTimeMillis());
alarm.setEndTs(alarm.getStartTs());
alarm.setId(new AlarmId(UUID.randomUUID()));
alarm.setCreatedTime(alarm.getStartTs());
freshAlarms.add(alarmDao.save(tenantId, alarm).getId());
}
alarmsCleanUpService.cleanUp();
for (AlarmId outdatedAlarm : outdatedAlarms) {
verify(alarmService).delAlarm(eq(tenantId), eq(outdatedAlarm));
}
for (AlarmId freshAlarm : freshAlarms) {
verify(alarmService, never()).delAlarm(eq(tenantId), eq(freshAlarm));
}
verify(cleanUpServiceLogger).info(startsWith("Removed {} outdated alarm"), eq((long) count), eq(tenantId), any());
}
}