Housekeeper: alarms deletion

This commit is contained in:
ViacheslavKlimov 2024-03-18 12:24:58 +02:00
parent b0d4faf798
commit a6719efee5
16 changed files with 123 additions and 40 deletions

View File

@ -25,15 +25,10 @@ import org.springframework.context.annotation.Lazy;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.alarm.AlarmQuery;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.edge.EdgeService;
@ -72,13 +67,6 @@ public abstract class AbstractTbEntityService {
@Lazy
private EntitiesVersionControlService vcService;
protected void removeAlarmsByEntityId(TenantId tenantId, EntityId entityId) {
PageData<AlarmInfo> alarms =
alarmService.findAlarms(tenantId, new AlarmQuery(entityId, new TimePageLink(Integer.MAX_VALUE), null, null, null, false));
alarms.getData().stream().map(AlarmInfo::getId).forEach(alarmId -> alarmService.delAlarm(tenantId, alarmId));
}
protected <T> T checkNotNull(T reference) throws ThingsboardException {
return checkNotNull(reference, "Requested item wasn't found!");
}

View File

@ -78,7 +78,6 @@ public class DefaultTbAssetService extends AbstractTbEntityService implements Tb
TenantId tenantId = asset.getTenantId();
AssetId assetId = asset.getId();
try {
removeAlarmsByEntityId(tenantId, assetId);
assetService.deleteAsset(tenantId, assetId);
notificationEntityService.logEntityAction(tenantId, assetId, asset, asset.getCustomerId(), actionType, user, assetId.toString());
tbClusterService.broadcastEntityStateChangeEvent(tenantId, assetId, ComponentLifecycleEvent.DELETED);

View File

@ -97,7 +97,6 @@ public class DefaultTbDeviceService extends AbstractTbEntityService implements T
TenantId tenantId = device.getTenantId();
DeviceId deviceId = device.getId();
try {
removeAlarmsByEntityId(tenantId, deviceId);
deviceService.deleteDevice(tenantId, deviceId);
notificationEntityService.notifyDeleteDevice(tenantId, deviceId, device.getCustomerId(), device,
user, deviceId.toString());

View File

@ -130,7 +130,7 @@ public class HousekeeperService {
}
if (log.isDebugEnabled()) {
log.debug("[{}] {} task {}", task.getTenantId(), isNew(msg.getTask()) ? "Processing" : "Reprocessing", msg.getTask().getValue());
log.debug("[{}] {} {}", task.getTenantId(), isNew(msg.getTask()) ? "Processing" : "Reprocessing", task.getDescription());
}
try {
Future<Object> future = executor.submit(() -> {
@ -140,7 +140,6 @@ public class HousekeeperService {
future.get(taskProcessingTimeout, TimeUnit.MILLISECONDS);
statsService.ifPresent(statsService -> statsService.reportProcessed(task.getTaskType(), msg));
log.debug("[{}] Successfully {} task {}", task.getTenantId(), isNew(msg.getTask()) ? "processed" : "reprocessed", msg.getTask().getValue());
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {

View File

@ -16,27 +16,35 @@
package org.thingsboard.server.service.housekeeper.processor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.housekeeper.HousekeeperTask;
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType;
import org.thingsboard.server.dao.alarm.AlarmService;
@Component
@RequiredArgsConstructor
public class EntityAlarmsDeletionTaskProcessor implements HousekeeperTaskProcessor<HousekeeperTask> {
@Slf4j
public class AlarmsDeletionTaskProcessor implements HousekeeperTaskProcessor<HousekeeperTask> {
private final AlarmService alarmService;
@Override
public void process(HousekeeperTask task) throws Exception {
alarmService.deleteEntityAlarmRecords(task.getTenantId(), task.getEntityId());
// fixme: do we need to remove alarms by originator ???
// fixme: why alarm comments are not deleted ??
EntityType entityType = task.getEntityId().getEntityType();
if (entityType == EntityType.DEVICE || entityType == EntityType.ASSET) {
int count = alarmService.deleteAlarmsByEntityId(task.getTenantId(), task.getEntityId());
log.debug("[{}][{}][{}] Deleted {} alarms", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), count);
} else {
int count = alarmService.deleteEntityAlarmRecords(task.getTenantId(), task.getEntityId());
log.debug("[{}][{}][{}] Deleted {} entity alarms", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), count);
}
}
@Override
public HousekeeperTaskType getTaskType() {
return HousekeeperTaskType.DELETE_ENTITY_ALARMS;
return HousekeeperTaskType.DELETE_ALARMS;
}
}

View File

@ -56,7 +56,6 @@ public class AlarmsCleanUpService {
private final TenantService tenantService;
private final AlarmDao alarmDao;
private final AlarmService alarmService;
private final RelationService relationService;
private final EntityActionService entityActionService;
private final PartitionService partitionService;
private final TbTenantProfileCache tenantProfileCache;

View File

@ -32,12 +32,17 @@ import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EventInfo;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmQuery;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.alarm.EntityAlarm;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.common.data.event.LifecycleEvent;
import org.thingsboard.server.common.data.housekeeper.HousekeeperTask;
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
@ -52,15 +57,19 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.housekeeper.HousekeeperClient;
import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.alarm.AlarmDao;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.event.EventService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
@ -115,6 +124,10 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
private RuleChainService ruleChainService;
@Autowired
private AlarmService alarmService;
@Autowired
private AlarmDao alarmDao;
@Autowired
private RelationService relationService;
@SpyBean
private TelemetryDeletionTaskProcessor telemetryDeletionTaskProcessor;
@ -200,6 +213,11 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
Device device = createDevice("oi324rujoi", "oi324rujoi");
createRelatedData(device.getId());
Asset asset = createAsset();
createRelatedData(asset.getId());
createRelation(device.getId(), asset.getId());
createAlarm(device.getId(), asset.getId());
RuleChainMetaData ruleChainMetaData = createRuleChain();
RuleChainId ruleChainId = ruleChainMetaData.getRuleChainId();
RuleNodeId ruleNode1Id = ruleChainMetaData.getNodes().get(0).getId();
@ -216,6 +234,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
verifyNoRelatedData(device.getId());
verifyNoRelatedData(asset.getId());
verifyNoRelatedData(ruleNode1Id);
verifyNoRelatedData(ruleNode2Id);
verifyNoRelatedData(ruleChainId);
@ -301,7 +320,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
}
private void verifyNoRelatedData(EntityId entityId) throws Exception {
List<HousekeeperTaskType> expectedTaskTypes = List.of(HousekeeperTaskType.DELETE_TELEMETRY, HousekeeperTaskType.DELETE_ATTRIBUTES, HousekeeperTaskType.DELETE_EVENTS, HousekeeperTaskType.DELETE_ENTITY_ALARMS);
List<HousekeeperTaskType> expectedTaskTypes = List.of(HousekeeperTaskType.DELETE_TELEMETRY, HousekeeperTaskType.DELETE_ATTRIBUTES, HousekeeperTaskType.DELETE_EVENTS, HousekeeperTaskType.DELETE_ALARMS);
for (HousekeeperTaskType taskType : expectedTaskTypes) {
verify(housekeeperClient).submitTask(argThat(task -> task.getTaskType() == taskType && task.getEntityId().equals(entityId)));
}
@ -312,6 +331,11 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
assertThat(getAttribute(entityId, scope, scope + ATTRIBUTE_KEY)).isNull();
}
assertThat(getEvents(entityId)).isEmpty();
assertThat(alarmDao.findEntityAlarmRecordsByEntityId(tenantId, entityId)).isEmpty();
assertThat(alarmService.findAlarms(tenantId, AlarmQuery.builder().pageLink(new TimePageLink(100)).build()).getData())
.filteredOn(alarm -> alarm.getOriginator().equals(entityId)).isEmpty();
assertThat(relationService.findByTo(tenantId, entityId, RelationTypeGroup.COMMON)).isEmpty();
assertThat(relationService.findByFrom(tenantId, entityId, RelationTypeGroup.COMMON)).isEmpty();
}
private void createAttribute(EntityId entityId, String scope, String key) throws Exception {
@ -336,6 +360,27 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
}
private void createRelation(DeviceId to, AssetId from) {
EntityRelation relation = new EntityRelation(from, to, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON);
relationService.saveRelation(tenantId, relation);
}
private void createAlarm(DeviceId deviceId, EntityId propagatedEntityId) {
Alarm alarm = doPost("/api/alarm", Alarm.builder()
.tenantId(tenantId)
.originator(deviceId)
.severity(AlarmSeverity.CRITICAL)
.type("test alarm for " + deviceId)
.propagate(true)
.build(), Alarm.class);
List<EntityAlarm> entityAlarms = alarmDao.findEntityAlarmRecords(tenantId, alarm.getId());
assertThat(entityAlarms).anyMatch(entityAlarm -> entityAlarm.getEntityId().equals(deviceId) && entityAlarm.getAlarmType().equals(alarm.getType()));
assertThat(entityAlarms).anyMatch(entityAlarm -> entityAlarm.getEntityId().equals(propagatedEntityId) && entityAlarm.getAlarmType().equals(alarm.getType()));
assertThat(alarmService.findAlarms(tenantId, AlarmQuery.builder().pageLink(new TimePageLink(100)).build()).getData())
.filteredOn(a -> a.getOriginator().equals(deviceId)).isNotEmpty();
}
private TsKvEntry getLatestTelemetry(EntityId entityId) throws Exception {
return timeseriesService.findLatest(tenantId, entityId, HousekeeperServiceTest.TELEMETRY_KEY).get().orElse(null);
}
@ -355,6 +400,13 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
.collect(Collectors.toList());
}
private Asset createAsset() {
Asset asset = new Asset();
asset.setName("test");
asset.setType("test");
return doPost("/api/asset", asset, Asset.class);
}
private RuleChainMetaData createRuleChain() {
RuleChain ruleChain = new RuleChain();
ruleChain.setTenantId(tenantId);

View File

@ -81,6 +81,8 @@ public interface AlarmService extends EntityDaoService {
void delAlarmTypes(TenantId tenantId, Set<String> types);
int deleteAlarmsByEntityId(TenantId tenantId, EntityId entityId);
// Other API
Alarm findAlarmById(TenantId tenantId, AlarmId alarmId);
@ -106,7 +108,7 @@ public interface AlarmService extends EntityDaoService {
PageData<AlarmId> findAlarmIdsByAssigneeId(TenantId tenantId, UserId userId, PageLink pageLink);
void deleteEntityAlarmRecords(TenantId tenantId, EntityId entityId);
int deleteEntityAlarmRecords(TenantId tenantId, EntityId entityId);
void deleteEntityAlarmRecordsByTenantId(TenantId tenantId);

View File

@ -69,8 +69,8 @@ public class HousekeeperTask implements Serializable {
return new AlarmsUnassignHousekeeperTask(user);
}
public static HousekeeperTask deleteEntityAlarms(TenantId tenantId, EntityId entityId) {
return new HousekeeperTask(tenantId, entityId, HousekeeperTaskType.DELETE_ENTITY_ALARMS);
public static HousekeeperTask deleteAlarms(TenantId tenantId, EntityId entityId) {
return new HousekeeperTask(tenantId, entityId, HousekeeperTaskType.DELETE_ALARMS);
}
public static HousekeeperTask deleteEntities(TenantId tenantId, EntityType entityType) {

View File

@ -26,8 +26,8 @@ public enum HousekeeperTaskType {
DELETE_ATTRIBUTES("attributes deletion"),
DELETE_TELEMETRY("telemetry deletion"),
DELETE_EVENTS("events deletion"),
UNASSIGN_ALARMS("alarms unassigning"),
DELETE_ENTITY_ALARMS("entity alarms deletion");
DELETE_ALARMS("alarms deletion"),
UNASSIGN_ALARMS("alarms unassigning");
private final String description;

View File

@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import java.io.Serializable;
import java.util.Collections;
@ -29,6 +30,7 @@ import java.util.stream.Collectors;
@ApiModel
@EqualsAndHashCode
@ToString
public class PageData<T> implements Serializable {
public static final PageData EMPTY_PAGE_DATA = new PageData<>();

View File

@ -84,7 +84,9 @@ public interface AlarmDao extends Dao<Alarm> {
List<EntityAlarm> findEntityAlarmRecords(TenantId tenantId, AlarmId id);
void deleteEntityAlarmRecords(TenantId tenantId, EntityId entityId);
List<EntityAlarm> findEntityAlarmRecordsByEntityId(TenantId tenantId, EntityId entityId);
int deleteEntityAlarmRecords(TenantId tenantId, EntityId entityId);
void deleteEntityAlarmRecordsByTenantId(TenantId tenantId);

View File

@ -51,6 +51,7 @@ import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.query.AlarmCountQuery;
import org.thingsboard.server.common.data.query.AlarmData;
import org.thingsboard.server.common.data.query.AlarmDataQuery;
@ -183,16 +184,23 @@ public class BaseAlarmService extends AbstractCachedEntityService<TenantId, Page
@Override
@Transactional
public AlarmApiCallResult delAlarm(TenantId tenantId, AlarmId alarmId, boolean checkAndDeleteAlarmType) {
log.debug("Deleting Alarm Id: {}", alarmId);
AlarmInfo alarm = alarmDao.findAlarmInfoById(tenantId, alarmId.getId());
return deleteAlarm(tenantId, alarm, checkAndDeleteAlarmType);
}
private AlarmApiCallResult deleteAlarm(TenantId tenantId, AlarmInfo alarm, boolean deleteAlarmTypes) {
if (alarm == null) {
return AlarmApiCallResult.builder().successful(false).build();
} else {
log.debug("[{}][{}] Executing deleteAlarm [{}]", tenantId, alarm.getOriginator(), alarm.getId());
var propagationIds = getPropagationEntityIdsList(alarm);
alarmDao.removeById(tenantId, alarm.getUuidId());
eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId)
.entityId(alarmId).entity(alarm).build());
if (checkAndDeleteAlarmType) {
eventPublisher.publishEvent(DeleteEntityEvent.builder()
.tenantId(tenantId)
.entityId(alarm.getId())
.entity(alarm)
.build());
if (deleteAlarmTypes) {
delAlarmTypes(tenantId, Collections.singleton(alarm.getType()));
}
return AlarmApiCallResult.builder().alarm(alarm).deleted(true).successful(true).propagatedEntitiesList(propagationIds).build();
@ -207,6 +215,24 @@ public class BaseAlarmService extends AbstractCachedEntityService<TenantId, Page
}
}
@Override
public int deleteAlarmsByEntityId(TenantId tenantId, EntityId entityId) {
PageLink pageLink = new PageLink(256);
PageData<AlarmInfo> alarms;
int count = 0;
do {
alarms = findAlarms(tenantId, AlarmQuery.builder()
.affectedEntityId(entityId)
.pageLink(new TimePageLink(pageLink, null, null))
.build());
for (AlarmInfo alarm : alarms.getData()) {
deleteAlarm(tenantId, alarm, true);
count++;
}
} while (alarms.hasNext());
return count;
}
private List<EntityId> createEntityAlarmRecords(Alarm alarm) throws ExecutionException, InterruptedException {
Set<EntityId> propagatedEntitiesSet = new LinkedHashSet<>();
propagatedEntitiesSet.add(alarm.getOriginator());
@ -322,9 +348,9 @@ public class BaseAlarmService extends AbstractCachedEntityService<TenantId, Page
}
@Override
public void deleteEntityAlarmRecords(TenantId tenantId, EntityId entityId) {
public int deleteEntityAlarmRecords(TenantId tenantId, EntityId entityId) {
log.trace("Executing deleteEntityAlarms [{}]", entityId);
alarmDao.deleteEntityAlarmRecords(tenantId, entityId);
return alarmDao.deleteEntityAlarmRecords(tenantId, entityId);
}
@Override

View File

@ -62,7 +62,7 @@ public class CleanUpService {
housekeeperClient.submitTask(HousekeeperTask.deleteAttributes(tenantId, entityId));
housekeeperClient.submitTask(HousekeeperTask.deleteTelemetry(tenantId, entityId));
housekeeperClient.submitTask(HousekeeperTask.deleteEvents(tenantId, entityId));
housekeeperClient.submitTask(HousekeeperTask.deleteEntityAlarms(tenantId, entityId));
housekeeperClient.submitTask(HousekeeperTask.deleteAlarms(tenantId, entityId));
}
public void removeTenantEntities(TenantId tenantId, EntityType... entityTypes) {

View File

@ -33,11 +33,13 @@ public interface EntityAlarmRepository extends JpaRepository<EntityAlarmEntity,
@Transactional
@Modifying
@Query("DELETE FROM EntityAlarmEntity e where e.entityId = :entityId")
void deleteByEntityId(@Param("entityId") UUID entityId);
int deleteByEntityId(@Param("entityId") UUID entityId);
@Transactional
@Modifying
@Query("DELETE FROM EntityAlarmEntity a WHERE a.tenantId = :tenantId")
void deleteByTenantId(@Param("tenantId") UUID tenantId);
List<EntityAlarmEntity> findAllByEntityId(UUID entityId);
}

View File

@ -309,8 +309,13 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A
}
@Override
public void deleteEntityAlarmRecords(TenantId tenantId, EntityId entityId) {
entityAlarmRepository.deleteByEntityId(entityId.getId());
public List<EntityAlarm> findEntityAlarmRecordsByEntityId(TenantId tenantId, EntityId entityId) {
return DaoUtil.convertDataList(entityAlarmRepository.findAllByEntityId(entityId.getId()));
}
@Override
public int deleteEntityAlarmRecords(TenantId tenantId, EntityId entityId) {
return entityAlarmRepository.deleteByEntityId(entityId.getId());
}
@Override