Performance improvements for processing alarms unassigning task
This commit is contained in:
parent
8aa67eeb44
commit
dd3936ca66
@ -37,11 +37,10 @@ import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.id.AlarmId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.service.entitiy.AbstractTbEntityService;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Service
|
||||
@AllArgsConstructor
|
||||
@ -174,20 +173,20 @@ public class DefaultTbAlarmService extends AbstractTbEntityService implements Tb
|
||||
}
|
||||
|
||||
@Override
|
||||
public int unassignDeletedUserAlarms(TenantId tenantId, UserId userId, String userTitle, long unassignTs) {
|
||||
int count = 0;
|
||||
PageLink pageLink = new PageLink(100);
|
||||
while (true) {
|
||||
PageData<AlarmId> pageData = alarmService.findAlarmIdsByAssigneeId(tenantId, userId, pageLink);
|
||||
List<AlarmId> alarms = pageData.getData();
|
||||
processAlarmsUnassignment(tenantId, userId, userTitle, alarms, unassignTs);
|
||||
|
||||
count += alarms.size();
|
||||
if (!pageData.hasNext()) {
|
||||
break;
|
||||
public void unassignDeletedUserAlarms(TenantId tenantId, UserId userId, String userTitle, List<UUID> alarms, long unassignTs) {
|
||||
for (UUID alarmId : alarms) {
|
||||
log.trace("[{}] Unassigning alarm {} from user {}", tenantId, alarmId, userId);
|
||||
AlarmApiCallResult result = alarmSubscriptionService.unassignAlarm(tenantId, new AlarmId(alarmId), unassignTs);
|
||||
if (!result.isSuccessful()) {
|
||||
log.error("[{}] Cannot unassign alarm {} from user {}", tenantId, alarmId, userId);
|
||||
continue;
|
||||
}
|
||||
if (result.isModified()) {
|
||||
String comment = String.format("Alarm was unassigned because user %s - was deleted", userTitle);
|
||||
addSystemAlarmComment(result.getAlarm(), null, "ASSIGN", comment);
|
||||
logEntityActionService.logEntityAction(result.getAlarm().getTenantId(), result.getAlarm().getOriginator(), result.getAlarm(), result.getAlarm().getCustomerId(), ActionType.ALARM_UNASSIGNED, null);
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -202,22 +201,6 @@ public class DefaultTbAlarmService extends AbstractTbEntityService implements Tb
|
||||
return ts > 0 ? ts : System.currentTimeMillis();
|
||||
}
|
||||
|
||||
private void processAlarmsUnassignment(TenantId tenantId, UserId userId, String userTitle, List<AlarmId> alarmIds, long unassignTs) {
|
||||
for (AlarmId alarmId : alarmIds) {
|
||||
log.trace("[{}] Unassigning alarm {} userId {}", tenantId, alarmId, userId);
|
||||
AlarmApiCallResult result = alarmSubscriptionService.unassignAlarm(tenantId, alarmId, unassignTs);
|
||||
if (!result.isSuccessful()) {
|
||||
log.error("[{}] Cannot unassign alarm {} userId {}", tenantId, alarmId, userId);
|
||||
continue;
|
||||
}
|
||||
if (result.isModified()) {
|
||||
String comment = String.format("Alarm was unassigned because user %s - was deleted", userTitle);
|
||||
addSystemAlarmComment(result.getAlarm(), null, "ASSIGN", comment);
|
||||
logEntityActionService.logEntityAction(result.getAlarm().getTenantId(), result.getAlarm().getOriginator(), result.getAlarm(), result.getAlarm().getCustomerId(), ActionType.ALARM_UNASSIGNED, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void addSystemAlarmComment(Alarm alarm, User user, String subType, String commentText) {
|
||||
addSystemAlarmComment(alarm, user, subType, commentText, null);
|
||||
}
|
||||
|
||||
@ -22,6 +22,9 @@ import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
public interface TbAlarmService {
|
||||
|
||||
Alarm save(Alarm entity, User user) throws ThingsboardException;
|
||||
@ -38,7 +41,7 @@ public interface TbAlarmService {
|
||||
|
||||
AlarmInfo unassign(Alarm alarm, long unassignTs, User user) throws ThingsboardException;
|
||||
|
||||
int unassignDeletedUserAlarms(TenantId tenantId, UserId userId, String userTitle, long unassignTs);
|
||||
void unassignDeletedUserAlarms(TenantId tenantId, UserId userId, String userTitle, List<UUID> alarms, long unassignTs);
|
||||
|
||||
Boolean delete(Alarm alarm, User user);
|
||||
}
|
||||
|
||||
@ -20,20 +20,47 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.housekeeper.AlarmsUnassignHousekeeperTask;
|
||||
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType;
|
||||
import org.thingsboard.server.common.data.id.AlarmId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
import org.thingsboard.server.common.data.util.TbPair;
|
||||
import org.thingsboard.server.dao.alarm.AlarmService;
|
||||
import org.thingsboard.server.service.entitiy.alarm.TbAlarmService;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class AlarmsUnassignTaskProcessor extends HousekeeperTaskProcessor<AlarmsUnassignHousekeeperTask> {
|
||||
|
||||
private final TbAlarmService alarmService;
|
||||
private final TbAlarmService tbAlarmService;
|
||||
private final AlarmService alarmService;
|
||||
|
||||
@Override
|
||||
public void process(AlarmsUnassignHousekeeperTask task) throws Exception {
|
||||
int count = alarmService.unassignDeletedUserAlarms(task.getTenantId(), (UserId) task.getEntityId(), task.getUserTitle(), task.getTs());
|
||||
log.debug("[{}][{}] Unassigned {} alarms", task.getTenantId(), task.getEntityId(), count);
|
||||
TenantId tenantId = task.getTenantId();
|
||||
UserId userId = (UserId) task.getEntityId();
|
||||
if (task.getAlarms() == null) {
|
||||
AlarmId lastId = null;
|
||||
long lastCreatedTime = 0;
|
||||
while (true) {
|
||||
List<TbPair<UUID, Long>> alarms = alarmService.findAlarmIdsByAssigneeId(tenantId, userId, lastCreatedTime, lastId, 64);
|
||||
if (alarms.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
housekeeperClient.submitTask(new AlarmsUnassignHousekeeperTask(tenantId, userId, task.getUserTitle(), alarms.stream().map(TbPair::getFirst).toList()));
|
||||
|
||||
TbPair<UUID, Long> last = alarms.get(alarms.size() - 1);
|
||||
lastId = new AlarmId(last.getFirst());
|
||||
lastCreatedTime = last.getSecond();
|
||||
log.debug("[{}][{}] Submitted task for unassigning {} alarms", tenantId, userId, alarms.size());
|
||||
}
|
||||
} else {
|
||||
tbAlarmService.unassignDeletedUserAlarms(tenantId, userId, task.getUserTitle(), task.getAlarms(), task.getTs());
|
||||
log.debug("[{}][{}] Unassigned {} alarms", tenantId, userId, task.getAlarms().size());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -37,7 +37,6 @@ import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.id.AlarmId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.dao.alarm.AlarmService;
|
||||
import org.thingsboard.server.dao.customer.CustomerService;
|
||||
import org.thingsboard.server.dao.edge.EdgeService;
|
||||
@ -46,7 +45,6 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||
import org.thingsboard.server.service.sync.vc.EntitiesVersionControlService;
|
||||
import org.thingsboard.server.service.telemetry.AlarmSubscriptionService;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@ -164,16 +162,13 @@ public class DefaultTbAlarmServiceTest {
|
||||
AlarmInfo alarm = new AlarmInfo();
|
||||
alarm.setId(new AlarmId(UUID.randomUUID()));
|
||||
|
||||
when(alarmService.findAlarmIdsByAssigneeId(any(), any(), any()))
|
||||
.thenReturn(new PageData<>(List.of(alarm.getId()), 0, 1, false))
|
||||
.thenReturn(new PageData<>(Collections.EMPTY_LIST, 0, 0, false));
|
||||
when(alarmSubscriptionService.unassignAlarm(any(), any(), anyLong()))
|
||||
.thenReturn(AlarmApiCallResult.builder().successful(true).modified(true).alarm(alarm).build());
|
||||
|
||||
User user = new User();
|
||||
user.setEmail("testEmail@gmail.com");
|
||||
user.setId(new UserId(UUID.randomUUID()));
|
||||
service.unassignDeletedUserAlarms(new TenantId(UUID.randomUUID()), user.getId(), user.getTitle(), System.currentTimeMillis());
|
||||
service.unassignDeletedUserAlarms(new TenantId(UUID.randomUUID()), user.getId(), user.getTitle(), List.of(alarm.getUuidId()), System.currentTimeMillis());
|
||||
|
||||
ObjectNode commentNode = JacksonUtil.newObjectNode();
|
||||
commentNode.put("subtype", "ASSIGN");
|
||||
|
||||
@ -55,8 +55,6 @@ import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.page.TimePageLink;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||
@ -64,6 +62,7 @@ import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainType;
|
||||
import org.thingsboard.server.common.data.rule.RuleNode;
|
||||
import org.thingsboard.server.common.data.util.TbPair;
|
||||
import org.thingsboard.server.controller.AbstractControllerTest;
|
||||
import org.thingsboard.server.dao.alarm.AlarmDao;
|
||||
import org.thingsboard.server.dao.alarm.AlarmService;
|
||||
@ -201,15 +200,16 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
|
||||
assertThat(alarm.getAssigneeId()).isEqualTo(userId);
|
||||
alarms.add(alarmId);
|
||||
}
|
||||
PageData<AlarmId> assignedAlarms = alarmService.findAlarmIdsByAssigneeId(tenantId, userId, new PageLink(Integer.MAX_VALUE));
|
||||
assertThat(assignedAlarms.getTotalElements()).isEqualTo(count);
|
||||
assertThat(assignedAlarms.getData()).containsAll(alarms);
|
||||
List<AlarmId> assignedAlarms = alarmService.findAlarmIdsByAssigneeId(tenantId, userId, 0, null, 5000).stream()
|
||||
.map(TbPair::getFirst).map(AlarmId::new).toList();
|
||||
assertThat(assignedAlarms).size().isEqualTo(count);
|
||||
assertThat(assignedAlarms).containsAll(alarms);
|
||||
|
||||
doDelete("/api/user/" + userId).andExpect(status().isOk());
|
||||
|
||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||
verifyNoRelatedData(userId);
|
||||
assertThat(alarmService.findAlarmIdsByAssigneeId(tenantId, userId, new PageLink(1)).getTotalElements()).isZero();
|
||||
assertThat(alarmService.findAlarmIdsByAssigneeId(tenantId, userId, 0, null, 5000)).size().isZero();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -107,7 +107,7 @@ public interface AlarmService extends EntityDaoService {
|
||||
PageData<AlarmData> findAlarmDataByQueryForEntities(TenantId tenantId,
|
||||
AlarmDataQuery query, Collection<EntityId> orderedEntityIds);
|
||||
|
||||
PageData<AlarmId> findAlarmIdsByAssigneeId(TenantId tenantId, UserId userId, PageLink pageLink);
|
||||
List<TbPair<UUID, Long>> findAlarmIdsByAssigneeId(TenantId tenantId, UserId userId, long createdTimeOffset, AlarmId idOffset, int limit);
|
||||
|
||||
List<TbPair<UUID, Long>> findAlarmIdsByOriginatorId(TenantId tenantId, EntityId originatorId, long createdTimeOffset, AlarmId idOffset, int limit);
|
||||
|
||||
@ -118,4 +118,5 @@ public interface AlarmService extends EntityDaoService {
|
||||
long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query);
|
||||
|
||||
PageData<EntitySubtype> findAlarmTypesByTenantId(TenantId tenantId, PageLink pageLink);
|
||||
|
||||
}
|
||||
|
||||
@ -21,6 +21,11 @@ import lombok.EqualsAndHashCode;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import org.thingsboard.server.common.data.User;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Data
|
||||
@ToString(callSuper = true)
|
||||
@ -29,10 +34,21 @@ import org.thingsboard.server.common.data.User;
|
||||
public class AlarmsUnassignHousekeeperTask extends HousekeeperTask {
|
||||
|
||||
private String userTitle;
|
||||
private List<UUID> alarms;
|
||||
|
||||
protected AlarmsUnassignHousekeeperTask(User user) {
|
||||
super(user.getTenantId(), user.getId(), HousekeeperTaskType.UNASSIGN_ALARMS);
|
||||
this.userTitle = user.getTitle();
|
||||
this(user.getTenantId(), user.getId(), user.getTitle(), null);
|
||||
}
|
||||
|
||||
public AlarmsUnassignHousekeeperTask(TenantId tenantId, UserId userId, String userTitle, List<UUID> alarms) {
|
||||
super(tenantId, userId, HousekeeperTaskType.UNASSIGN_ALARMS);
|
||||
this.userTitle = userTitle;
|
||||
this.alarms = alarms;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return super.getDescription() + (alarms != null ? " (" + alarms + ")" : "");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -79,7 +79,7 @@ public interface AlarmDao extends Dao<Alarm> {
|
||||
|
||||
PageData<AlarmId> findAlarmsIdsByEndTsBeforeAndTenantId(Long time, TenantId tenantId, PageLink pageLink);
|
||||
|
||||
PageData<AlarmId> findAlarmIdsByAssigneeId(TenantId tenantId, UUID userId, PageLink pageLink);
|
||||
PageData<TbPair<UUID, Long>> findAlarmIdsByAssigneeId(TenantId tenantId, UserId userId, long createdTimeOffset, AlarmId idOffset, int limit);
|
||||
|
||||
PageData<TbPair<UUID, Long>> findAlarmIdsByOriginatorId(TenantId tenantId, EntityId originatorId, long createdTimeOffset, AlarmId idOffset, int limit);
|
||||
|
||||
|
||||
@ -308,10 +308,10 @@ public class BaseAlarmService extends AbstractCachedEntityService<TenantId, Page
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<AlarmId> findAlarmIdsByAssigneeId(TenantId tenantId, UserId userId, PageLink pageLink) {
|
||||
log.trace("[{}] Executing findAlarmIdsByAssigneeId [{}]", tenantId, userId);
|
||||
public List<TbPair<UUID, Long>> findAlarmIdsByAssigneeId(TenantId tenantId, UserId userId, long createdTimeOffset, AlarmId idOffset, int limit) {
|
||||
log.trace("[{}] Executing findAlarmIdsByAssigneeId [{}][{}]", tenantId, userId, idOffset);
|
||||
validateId(userId, id -> "Incorrect userId " + id);
|
||||
return alarmDao.findAlarmIdsByAssigneeId(tenantId, userId.getId(), pageLink);
|
||||
return alarmDao.findAlarmIdsByAssigneeId(tenantId, userId, createdTimeOffset, idOffset, limit).getData();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -476,4 +476,5 @@ public class BaseAlarmService extends AbstractCachedEntityService<TenantId, Page
|
||||
request.setEndTs(request.getStartTs());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -331,8 +331,23 @@ public interface AlarmRepository extends JpaRepository<AlarmEntity, UUID> {
|
||||
@Query(value = "SELECT a FROM AlarmInfoEntity a WHERE a.tenantId = :tenantId AND a.id = :alarmId")
|
||||
AlarmInfoEntity findAlarmInfoById(@Param("tenantId") UUID tenantId, @Param("alarmId") UUID alarmId);
|
||||
|
||||
@Query("SELECT a.id FROM AlarmEntity a WHERE a.tenantId = :tenantId AND a.assigneeId = :assigneeId")
|
||||
Page<UUID> findAlarmIdsByAssigneeId(@Param("tenantId") UUID tenantId, @Param("assigneeId") UUID assigneeId, Pageable pageable);
|
||||
// using Slice so that count query is not executed
|
||||
@Query("SELECT new org.thingsboard.server.common.data.util.TbPair(a.id, a.createdTime) " +
|
||||
"FROM AlarmEntity a WHERE a.tenantId = :tenantId AND a.assigneeId = :assigneeId")
|
||||
Slice<TbPair<UUID, Long>> findAlarmIdsByAssigneeId(@Param("tenantId") UUID tenantId,
|
||||
@Param("assigneeId") UUID assigneeId,
|
||||
Pageable pageable);
|
||||
|
||||
// using Slice so that count query is not executed
|
||||
@Query("SELECT new org.thingsboard.server.common.data.util.TbPair(a.id, a.createdTime) " +
|
||||
"FROM AlarmEntity a WHERE a.tenantId = :tenantId AND a.assigneeId = :assigneeId " +
|
||||
"AND (a.createdTime > :createdTimeOffset OR " +
|
||||
"(a.createdTime = :createdTimeOffset AND a.id > :idOffset))")
|
||||
Slice<TbPair<UUID, Long>> findAlarmIdsByAssigneeId(@Param("tenantId") UUID tenantId,
|
||||
@Param("assigneeId") UUID assigneeId,
|
||||
@Param("createdTimeOffset") long createdTimeOffset,
|
||||
@Param("idOffset") UUID idOffset,
|
||||
Pageable pageable);
|
||||
|
||||
// using Slice so that count query is not executed
|
||||
@Query("SELECT new org.thingsboard.server.common.data.util.TbPair(a.id, a.createdTime) " +
|
||||
|
||||
@ -296,9 +296,15 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<AlarmId> findAlarmIdsByAssigneeId(TenantId tenantId, UUID userId, PageLink pageLink) {
|
||||
return DaoUtil.pageToPageData(alarmRepository.findAlarmIdsByAssigneeId(tenantId.getId(), userId, DaoUtil.toPageable(pageLink)))
|
||||
.mapData(AlarmId::new);
|
||||
public PageData<TbPair<UUID, Long>> findAlarmIdsByAssigneeId(TenantId tenantId, UserId userId, long createdTimeOffset, AlarmId idOffset, int limit) {
|
||||
Slice<TbPair<UUID, Long>> result;
|
||||
Pageable pageRequest = toPageable(new PageLink(limit), List.of(SortOrder.of("createdTime", ASC), SortOrder.of("id", ASC)));
|
||||
if (idOffset == null) {
|
||||
result = alarmRepository.findAlarmIdsByAssigneeId(tenantId.getId(), userId.getId(), pageRequest);
|
||||
} else {
|
||||
result = alarmRepository.findAlarmIdsByAssigneeId(tenantId.getId(), userId.getId(), createdTimeOffset, idOffset.getId(), pageRequest);
|
||||
}
|
||||
return DaoUtil.pageToPageData(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user