Refactored repository method for getting assigned alarms, added limitations, logs and adopted tests for event based model alarms unassigning on user removing

This commit is contained in:
imbeacon 2023-08-16 09:09:40 +03:00
parent 19c5e50884
commit 4b8db9f37d
11 changed files with 134 additions and 80 deletions

View File

@ -42,6 +42,7 @@ import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent;
import org.thingsboard.server.dao.housekeeper.HouseKeeperService;
import org.thingsboard.server.service.entitiy.AbstractTbEntityService;
import java.util.ArrayList;
import java.util.List;
@Service
@ -224,43 +225,50 @@ public class DefaultTbAlarmService extends AbstractTbEntityService implements Tb
}
@Override
public List<AlarmId> unassignDeletedUserAlarms(User user) {
List<AlarmId> alarmIds = alarmService.findAlarmIdsByAssigneeId(user.getId());
for (AlarmId alarmId : alarmIds) {
log.trace("[{}] Unassigning alarm {} userId {}", user.getTenantId().getId(), alarmId.getId(), user.getId().getId());
AlarmApiCallResult result = alarmSubscriptionService.unassignAlarm(user.getTenantId(), alarmId, System.currentTimeMillis());
Alarm alarm = result.getAlarm();
if (!result.isSuccessful()) {
continue;
}
if (result.isModified()) {
try {
AlarmComment alarmComment = AlarmComment.builder()
.alarmId(alarmId)
.type(AlarmCommentType.SYSTEM)
.comment(JacksonUtil.newObjectNode()
.put("text", String.format("Alarm was unassigned because user with id %s - was deleted",
(user.getFirstName() == null || user.getLastName() == null) ? user.getName() : user.getFirstName() + " " + user.getLastName()))
.put("userId", user.getId().toString())
.put("subtype", "ASSIGN"))
.build();
alarmCommentService.saveAlarmComment(alarm, alarmComment, null);
} catch (ThingsboardException e) {
log.error("Failed to save alarm comment", e);
public List<AlarmId> unassignDeletedUserAlarms(TenantId tenantId, User user) {
List<AlarmId> totalAlarmIds = new ArrayList<>();
List<AlarmId> alarmIds;
do {
alarmIds = alarmService.findAlarmIdsByAssigneeId(tenantId, user.getId(), 100);
for (AlarmId alarmId : alarmIds) {
log.trace("[{}] Unassigning alarm {} userId {}", tenantId, alarmId.getId(), user.getId().getId());
AlarmApiCallResult result = alarmSubscriptionService.unassignAlarm(user.getTenantId(), alarmId, System.currentTimeMillis());
Alarm alarm = result.getAlarm();
if (!result.isSuccessful()) {
continue;
}
notificationEntityService.logEntityAction(alarm.getTenantId(), alarm.getOriginator(), result.getAlarm(),
alarm.getCustomerId(), ActionType.ALARM_UNASSIGNED, null);
if (result.isModified()) {
try {
AlarmComment alarmComment = AlarmComment.builder()
.alarmId(alarmId)
.type(AlarmCommentType.SYSTEM)
.comment(JacksonUtil.newObjectNode()
.put("text", String.format("Alarm was unassigned because user with id %s - was deleted",
(user.getFirstName() == null || user.getLastName() == null) ? user.getName() : user.getFirstName() + " " + user.getLastName()))
.put("userId", user.getId().toString())
.put("subtype", "ASSIGN"))
.build();
alarmCommentService.saveAlarmComment(alarm, alarmComment, null);
} catch (ThingsboardException e) {
log.error("Failed to save alarm comment", e);
}
notificationEntityService.logEntityAction(alarm.getTenantId(), alarm.getOriginator(), result.getAlarm(),
alarm.getCustomerId(), ActionType.ALARM_UNASSIGNED, null);
}
totalAlarmIds.addAll(alarmIds);
}
}
return alarmIds;
while (!alarmIds.isEmpty());
return totalAlarmIds;
}
@TransactionalEventListener(fallbackExecution = true)
public void handleEvent(DeleteEntityEvent<?> event) {
log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event);
log.trace("[{}] DeleteEntityEvent handler: {}", event.getTenantId(), event);
EntityId entityId = event.getEntityId();
if (EntityType.USER.equals(entityId.getEntityType())) {
housekeeper.unassignDeletedUserAlarms((User) event.getEntity());
housekeeper.unassignDeletedUserAlarms(event.getTenantId(), (User) event.getEntity());
}
}

View File

@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import java.util.List;
@ -40,7 +41,7 @@ public interface TbAlarmService {
AlarmInfo unassign(Alarm alarm, long unassignTs, User user) throws ThingsboardException;
List<AlarmId> unassignDeletedUserAlarms(User user);
List<AlarmId> unassignDeletedUserAlarms(TenantId tenantId, User user);
Boolean delete(Alarm alarm, User user);
}

View File

@ -15,16 +15,20 @@
*/
package org.thingsboard.server.service.housekeeper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.housekeeper.HouseKeeperService;
import org.thingsboard.server.service.entitiy.alarm.TbAlarmService;
@ -45,27 +49,43 @@ public class InMemoryHouseKeeperServiceService implements HouseKeeperService {
ListeningExecutorService executor;
AtomicInteger queueSize = new AtomicInteger();
AtomicInteger totalProcessedCounter = new AtomicInteger();
@PostConstruct
public void init() {
executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("housekeeper")));
log.debug("Starting HouseKeeper service");
executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("housekeeper")));
}
@PreDestroy
public void destroy() {
if (executor != null) {
log.debug("Stopping HouseKeeper service");
executor.shutdown();
}
}
@Override
public ListenableFuture<List<AlarmId>> unassignDeletedUserAlarms(User user) {
log.debug("[{}][{}] unassignDeletedUserAlarms submitting, pending queue size: {} ", user.getTenantId().getId(), user.getId().getId(), queueSize.get());
public ListenableFuture<List<AlarmId>> unassignDeletedUserAlarms(TenantId tenantId, User user) {
log.debug("[{}][{}] unassignDeletedUserAlarms submitting, pending queue size: {} ", tenantId, user.getId().getId(), queueSize.get());
queueSize.incrementAndGet();
ListenableFuture<List<AlarmId>> future = executor.submit(() -> alarmService.unassignDeletedUserAlarms(user));
future.addListener(() -> {
queueSize.decrementAndGet();
log.debug("[{}][{}] unassignDeletedUserAlarms finished, pending queue size: {} ", user.getTenantId().getId(), user.getId().getId(), queueSize.get());
ListenableFuture<List<AlarmId>> future = executor.submit(() -> alarmService.unassignDeletedUserAlarms(tenantId, user));
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(List<AlarmId> alarmIds) {
queueSize.decrementAndGet();
totalProcessedCounter.incrementAndGet();
log.debug("[{}][{}] unassignDeletedUserAlarms finished, pending queue size: {}, total processed count: {} ",
tenantId, user.getId().getId(), queueSize.get(), totalProcessedCounter.get());
}
@Override
public void onFailure(@NotNull Throwable throwable) {
queueSize.decrementAndGet();
totalProcessedCounter.incrementAndGet();
log.error("[{}][{}] unassignDeletedUserAlarms failed, pending queue size: {}, total processed count: {}",
tenantId, user.getId().getId(), queueSize.get(), totalProcessedCounter.get(), throwable);
}
}, MoreExecutors.directExecutor());
return future;
}

View File

@ -29,6 +29,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.test.context.ContextConfiguration;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntityType;
@ -46,6 +47,7 @@ import org.thingsboard.server.dao.service.DaoSqlTest;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.containsString;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@ -568,10 +570,13 @@ public class AlarmControllerTest extends AbstractControllerTest {
long beforeAssignmentTs = System.currentTimeMillis();
doPost("/api/alarm/" + alarm.getId() + "/assign/" + savedUser.getId().getId()).andExpect(status().isOk());
AlarmInfo foundAlarm = doGet("/api/alarm/info/" + alarm.getId(), AlarmInfo.class);
Assert.assertNotNull(foundAlarm);
Assert.assertEquals(savedUser.getId(), foundAlarm.getAssigneeId());
Assert.assertTrue(foundAlarm.getAssignTs() >= beforeAssignmentTs);
Alarm finalAlarm = alarm;
var alarmObj = new Object() {
AlarmInfo foundAlarm = doGet("/api/alarm/info/" + finalAlarm.getId(), AlarmInfo.class);
};
Assert.assertNotNull(alarmObj.foundAlarm);
Assert.assertEquals(savedUser.getId(), alarmObj.foundAlarm.getAssigneeId());
Assert.assertTrue(alarmObj.foundAlarm.getAssignTs() >= beforeAssignmentTs);
beforeAssignmentTs = System.currentTimeMillis();
@ -583,10 +588,14 @@ public class AlarmControllerTest extends AbstractControllerTest {
loginDifferentTenant();
foundAlarm = doGet("/api/alarm/info/" + alarm.getId(), AlarmInfo.class);
Assert.assertNotNull(foundAlarm);
Assert.assertNull(foundAlarm.getAssigneeId());
Assert.assertTrue(foundAlarm.getAssignTs() >= beforeAssignmentTs);
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
alarmObj.foundAlarm = doGet("/api/alarm/info/" + finalAlarm.getId(), AlarmInfo.class);
return alarmObj.foundAlarm.getAssigneeId() == null;
});
Assert.assertNotNull(alarmObj.foundAlarm);
Assert.assertNull(alarmObj.foundAlarm.getAssigneeId());
Assert.assertTrue(alarmObj.foundAlarm.getAssignTs() >= beforeAssignmentTs);
}
@Test
@ -617,10 +626,13 @@ public class AlarmControllerTest extends AbstractControllerTest {
long beforeAssignmentTs = System.currentTimeMillis();
doPost("/api/alarm/" + alarm.getId() + "/assign/" + savedUser.getId().getId()).andExpect(status().isOk());
AlarmInfo foundAlarm = doGet("/api/alarm/info/" + alarm.getId(), AlarmInfo.class);
Assert.assertNotNull(foundAlarm);
Assert.assertEquals(savedUser.getId(), foundAlarm.getAssigneeId());
Assert.assertTrue(foundAlarm.getAssignTs() >= beforeAssignmentTs);
Alarm finalAlarm = alarm;
var alarmObj = new Object() {
AlarmInfo foundAlarm = doGet("/api/alarm/info/" + finalAlarm.getId(), AlarmInfo.class);
};
Assert.assertNotNull(alarmObj.foundAlarm);
Assert.assertEquals(savedUser.getId(), alarmObj.foundAlarm.getAssigneeId());
Assert.assertTrue(alarmObj.foundAlarm.getAssignTs() >= beforeAssignmentTs);
beforeAssignmentTs = System.currentTimeMillis();
@ -628,10 +640,14 @@ public class AlarmControllerTest extends AbstractControllerTest {
doDelete("/api/user/" + savedUser.getId().getId()).andExpect(status().isOk());
foundAlarm = doGet("/api/alarm/info/" + alarm.getId(), AlarmInfo.class);
Assert.assertNotNull(foundAlarm);
Assert.assertNull(foundAlarm.getAssigneeId());
Assert.assertTrue(foundAlarm.getAssignTs() >= beforeAssignmentTs);
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
alarmObj.foundAlarm = doGet("/api/alarm/info/" + finalAlarm.getId(), AlarmInfo.class);
return alarmObj.foundAlarm.getAssigneeId() == null;
});
Assert.assertNotNull(alarmObj.foundAlarm);
Assert.assertNull(alarmObj.foundAlarm.getAssigneeId());
Assert.assertTrue(alarmObj.foundAlarm.getAssignTs() >= beforeAssignmentTs);
}
@Test
@ -669,10 +685,13 @@ public class AlarmControllerTest extends AbstractControllerTest {
long beforeAssignmentTs = System.currentTimeMillis();
doPost("/api/alarm/" + alarm.getId() + "/assign/" + savedUser.getId().getId()).andExpect(status().isOk());
AlarmInfo foundAlarm = doGet("/api/alarm/info/" + alarm.getId(), AlarmInfo.class);
Assert.assertNotNull(foundAlarm);
Assert.assertEquals(savedUser.getId(), foundAlarm.getAssigneeId());
Assert.assertTrue(foundAlarm.getAssignTs() >= beforeAssignmentTs);
Alarm finalAlarm = alarm;
var alarmObj = new Object() {
AlarmInfo foundAlarm = doGet("/api/alarm/info/" + finalAlarm.getId(), AlarmInfo.class);
};
Assert.assertNotNull(alarmObj.foundAlarm);
Assert.assertEquals(savedUser.getId(), alarmObj.foundAlarm.getAssigneeId());
Assert.assertTrue(alarmObj.foundAlarm.getAssignTs() >= beforeAssignmentTs);
beforeAssignmentTs = System.currentTimeMillis();
@ -680,10 +699,14 @@ public class AlarmControllerTest extends AbstractControllerTest {
doDelete("/api/customer/" + differentTenantCustomerId.getId()).andExpect(status().isOk());
foundAlarm = doGet("/api/alarm/info/" + alarm.getId(), AlarmInfo.class);
Assert.assertNotNull(foundAlarm);
Assert.assertNull(foundAlarm.getAssigneeId());
Assert.assertTrue(foundAlarm.getAssignTs() >= beforeAssignmentTs);
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
alarmObj.foundAlarm = doGet("/api/alarm/info/" + finalAlarm.getId(), AlarmInfo.class);
return alarmObj.foundAlarm.getAssigneeId() == null;
});
Assert.assertNotNull(alarmObj.foundAlarm);
Assert.assertNull(alarmObj.foundAlarm.getAssigneeId());
Assert.assertTrue(alarmObj.foundAlarm.getAssignTs() >= beforeAssignmentTs);
}
@Test

View File

@ -119,7 +119,7 @@ public interface AlarmService extends EntityDaoService {
PageData<AlarmData> findAlarmDataByQueryForEntities(TenantId tenantId,
AlarmDataQuery query, Collection<EntityId> orderedEntityIds);
List<AlarmId> findAlarmIdsByAssigneeId(UserId userId);
List<AlarmId> findAlarmIdsByAssigneeId(TenantId tenantId, UserId userId, int limit);
void deleteEntityAlarmRelations(TenantId tenantId, EntityId entityId);

View File

@ -18,10 +18,12 @@ package org.thingsboard.server.dao.housekeeper;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.TenantId;
import java.util.List;
public interface HouseKeeperService {
ListenableFuture<List<AlarmId>> unassignDeletedUserAlarms(User user);
ListenableFuture<List<AlarmId>> unassignDeletedUserAlarms(TenantId tenantId, User user);
}

View File

@ -31,27 +31,27 @@ import java.util.List;
public interface UserService extends EntityDaoService {
User findUserById(TenantId tenantId, UserId userId);
User findUserById(TenantId tenantId, UserId userId);
ListenableFuture<User> findUserByIdAsync(TenantId tenantId, UserId userId);
ListenableFuture<User> findUserByIdAsync(TenantId tenantId, UserId userId);
User findUserByEmail(TenantId tenantId, String email);
User findUserByEmail(TenantId tenantId, String email);
User findUserByTenantIdAndEmail(TenantId tenantId, String email);
User saveUser(TenantId tenantId, User user);
User saveUser(TenantId tenantId, User user);
UserCredentials findUserCredentialsByUserId(TenantId tenantId, UserId userId);
UserCredentials findUserCredentialsByUserId(TenantId tenantId, UserId userId);
UserCredentials findUserCredentialsByActivateToken(TenantId tenantId, String activateToken);
UserCredentials findUserCredentialsByActivateToken(TenantId tenantId, String activateToken);
UserCredentials findUserCredentialsByResetToken(TenantId tenantId, String resetToken);
UserCredentials findUserCredentialsByResetToken(TenantId tenantId, String resetToken);
UserCredentials saveUserCredentials(TenantId tenantId, UserCredentials userCredentials);
UserCredentials saveUserCredentials(TenantId tenantId, UserCredentials userCredentials);
UserCredentials activateUserCredentials(TenantId tenantId, String activateToken, String password);
UserCredentials activateUserCredentials(TenantId tenantId, String activateToken, String password);
UserCredentials requestPasswordReset(TenantId tenantId, String email);
UserCredentials requestPasswordReset(TenantId tenantId, String email);
UserCredentials requestExpiredPasswordReset(TenantId tenantId, UserCredentialsId userCredentialsId);

View File

@ -77,7 +77,7 @@ public interface AlarmDao extends Dao<Alarm> {
PageData<AlarmId> findAlarmsIdsByEndTsBeforeAndTenantId(Long time, TenantId tenantId, PageLink pageLink);
List<AlarmId> findAlarmIdsByAssigneeId(UUID key);
List<AlarmId> findAlarmIdsByAssigneeId(TenantId tenantId, UUID userId, int limit);
void createEntityAlarmRecord(EntityAlarm entityAlarm);

View File

@ -385,10 +385,10 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
}
@Override
public List<AlarmId> findAlarmIdsByAssigneeId(UserId userId) {
public List<AlarmId> findAlarmIdsByAssigneeId(TenantId tenantId, UserId userId, int limit) {
log.trace("Executing findAlarmIdsByAssigneeId [{}]", userId);
validateId(userId, "Incorrect alarmId " + userId);
return alarmDao.findAlarmIdsByAssigneeId(userId.getId());
validateId(userId, "Incorrect userId " + userId);
return alarmDao.findAlarmIdsByAssigneeId(tenantId, userId.getId(), limit);
}
@Override

View File

@ -316,7 +316,7 @@ public interface AlarmRepository extends JpaRepository<AlarmEntity, UUID> {
AlarmInfoEntity findAlarmInfoById(@Param("tenantId") UUID tenantId, @Param("alarmId") UUID alarmId);
@Query("SELECT a.id FROM AlarmEntity a WHERE a.assigneeId = :assigneeId")
List<UUID> findAlarmIdsByAssigneeId(@Param("assigneeId") UUID assigneeId);
List<UUID> findAlarmIdsByAssigneeId(@Param("assigneeId") UUID assigneeId, Pageable pageable);
@Query(value = "SELECT create_or_update_active_alarm(:t_id, :c_id, :a_id, :a_created_ts, :a_o_id, :a_o_type, :a_type, :a_severity, " +
":a_start_ts, :a_end_ts, :a_details, :a_propagate, :a_propagate_to_owner, " +

View File

@ -66,7 +66,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* Created by Valerii Sosliuk on 5/19/2017.
@ -287,8 +286,9 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A
}
@Override
public List<AlarmId> findAlarmIdsByAssigneeId(UUID key) {
List<UUID> assignedAlarmIds = alarmRepository.findAlarmIdsByAssigneeId(key);
public List<AlarmId> findAlarmIdsByAssigneeId(TenantId tenantId, UUID userId, int limit) {
log.debug("[{}] findAlarmIdsByAssigneeId [{}] limit {}", tenantId, userId, limit);
List<UUID> assignedAlarmIds = alarmRepository.findAlarmIdsByAssigneeId(userId, PageRequest.of(0, limit));
return DaoUtil.fromUUIDs(assignedAlarmIds, AlarmId::new);
}