Alarms unassign housekeeper task; remove in-memory housekeeper service

This commit is contained in:
ViacheslavKlimov 2024-02-07 16:50:03 +02:00
parent 1d5b35a5aa
commit d5d18f1138
14 changed files with 82 additions and 140 deletions

View File

@ -176,16 +176,16 @@ public class DefaultTbAlarmService extends AbstractTbEntityService implements Tb
}
@Override
public List<AlarmId> unassignDeletedUserAlarms(TenantId tenantId, User user, long unassignTs) {
public List<AlarmId> unassignDeletedUserAlarms(TenantId tenantId, UserId userId, String userTitle, long unassignTs) {
List<AlarmId> totalAlarmIds = new ArrayList<>();
PageLink pageLink = new PageLink(100, 0, null, new SortOrder("id", SortOrder.Direction.ASC));
while (true) {
PageData<AlarmId> pageData = alarmService.findAlarmIdsByAssigneeId(user.getTenantId(), user.getId(), pageLink);
PageData<AlarmId> pageData = alarmService.findAlarmIdsByAssigneeId(tenantId, userId, pageLink);
List<AlarmId> alarmIds = pageData.getData();
if (alarmIds.isEmpty()) {
break;
}
processAlarmsUnassignment(tenantId, user, alarmIds, unassignTs);
processAlarmsUnassignment(tenantId, userId, userTitle, alarmIds, unassignTs);
totalAlarmIds.addAll(alarmIds);
pageLink = pageLink.nextPageLink();
}
@ -204,16 +204,16 @@ public class DefaultTbAlarmService extends AbstractTbEntityService implements Tb
return ts > 0 ? ts : System.currentTimeMillis();
}
private void processAlarmsUnassignment(TenantId tenantId, User user, List<AlarmId> alarmIds, long unassignTs) {
private void processAlarmsUnassignment(TenantId tenantId, UserId userId, String userTitle, List<AlarmId> alarmIds, long unassignTs) {
for (AlarmId alarmId : alarmIds) {
log.trace("[{}] Unassigning alarm {} userId {}", tenantId, alarmId, user.getId());
AlarmApiCallResult result = alarmSubscriptionService.unassignAlarm(user.getTenantId(), alarmId, unassignTs);
log.trace("[{}] Unassigning alarm {} userId {}", tenantId, alarmId, userId);
AlarmApiCallResult result = alarmSubscriptionService.unassignAlarm(tenantId, alarmId, unassignTs);
if (!result.isSuccessful()) {
log.error("[{}] Cannot unassign alarm {} userId {}", tenantId, alarmId, user.getId());
log.error("[{}] Cannot unassign alarm {} userId {}", tenantId, alarmId, userId);
continue;
}
if (result.isModified()) {
String comment = String.format("Alarm was unassigned because user %s - was deleted", user.getTitle());
String comment = String.format("Alarm was unassigned because user %s - was deleted", userTitle);
addSystemAlarmComment(result.getAlarm(), null, "ASSIGN", comment);
notificationEntityService.logEntityAction(result.getAlarm().getTenantId(), result.getAlarm().getOriginator(), result.getAlarm(), result.getAlarm().getCustomerId(), ActionType.ALARM_UNASSIGNED, null);
}

View File

@ -41,7 +41,7 @@ public interface TbAlarmService {
AlarmInfo unassign(Alarm alarm, long unassignTs, User user) throws ThingsboardException;
List<AlarmId> unassignDeletedUserAlarms(TenantId tenantId, User user, long unassignTs);
List<AlarmId> unassignDeletedUserAlarms(TenantId tenantId, UserId userId, String userTitle, long unassignTs);
Boolean delete(Alarm alarm, User user);
}

View File

@ -1,103 +0,0 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.housekeeper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionalEventListener;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent;
import org.thingsboard.server.dao.housekeeper.HouseKeeperService;
import org.thingsboard.server.service.entitiy.alarm.TbAlarmService;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@RequiredArgsConstructor
@Slf4j
public class InMemoryHouseKeeperServiceService implements HouseKeeperService {
final TbAlarmService alarmService;
ListeningExecutorService executor;
AtomicInteger queueSize = new AtomicInteger();
AtomicInteger totalProcessedCounter = new AtomicInteger();
@PostConstruct
public void init() {
log.debug("Starting HouseKeeper service");
executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("housekeeper")));
}
@PreDestroy
public void destroy() {
if (executor != null) {
log.debug("Stopping HouseKeeper service");
executor.shutdown();
}
}
@TransactionalEventListener(fallbackExecution = true)
public void handleEvent(DeleteEntityEvent<?> event) {
log.trace("[{}] DeleteEntityEvent handler: {}", event.getTenantId(), event);
EntityId entityId = event.getEntityId();
if (EntityType.USER.equals(entityId.getEntityType())) {
unassignDeletedUserAlarms(event.getTenantId(), (User) event.getEntity(), event.getTs());
}
}
@Override
public ListenableFuture<List<AlarmId>> unassignDeletedUserAlarms(TenantId tenantId, User user, long unassignTs) {
log.debug("[{}][{}] unassignDeletedUserAlarms submitting, pending queue size: {} ", tenantId, user.getId().getId(), queueSize.get());
queueSize.incrementAndGet();
ListenableFuture<List<AlarmId>> future = executor.submit(() -> alarmService.unassignDeletedUserAlarms(tenantId, user, unassignTs));
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(List<AlarmId> alarmIds) {
queueSize.decrementAndGet();
totalProcessedCounter.incrementAndGet();
log.debug("[{}][{}] unassignDeletedUserAlarms finished, pending queue size: {}, total processed count: {} ",
tenantId, user.getId().getId(), queueSize.get(), totalProcessedCounter.get());
}
@Override
public void onFailure(Throwable throwable) {
queueSize.decrementAndGet();
totalProcessedCounter.incrementAndGet();
log.error("[{}][{}] unassignDeletedUserAlarms failed, pending queue size: {}, total processed count: {}",
tenantId, user.getId().getId(), queueSize.get(), totalProcessedCounter.get(), throwable);
}
}, MoreExecutors.directExecutor());
return future;
}
}

View File

@ -0,0 +1,41 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.housekeeper.processor;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.dao.housekeeper.data.HousekeeperTaskType;
import org.thingsboard.server.dao.housekeeper.data.AlarmsUnassignHousekeeperTask;
import org.thingsboard.server.service.entitiy.alarm.TbAlarmService;
@Component
@RequiredArgsConstructor
public class AlarmsUnassignTaskProcessor implements HousekeeperTaskProcessor<AlarmsUnassignHousekeeperTask> {
private final TbAlarmService alarmService;
@Override
public void process(AlarmsUnassignHousekeeperTask task) throws Exception {
alarmService.unassignDeletedUserAlarms(task.getTenantId(), (UserId) task.getEntityId(), task.getUserTitle(), task.getTs());
}
@Override
public HousekeeperTaskType getTaskType() {
return HousekeeperTaskType.UNASSIGN_ALARMS;
}
}

View File

@ -17,14 +17,13 @@ package org.thingsboard.server.service.housekeeper.processor;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.housekeeper.data.HousekeeperTask;
import org.thingsboard.server.dao.housekeeper.data.HousekeeperTaskType;
@Component
@RequiredArgsConstructor
public class AttributesDeletionTaskProcessor implements HousekeeperTaskProcessor {
public class AttributesDeletionTaskProcessor implements HousekeeperTaskProcessor<HousekeeperTask> {
private final AttributesService attributesService;

View File

@ -23,7 +23,7 @@ import org.thingsboard.server.dao.housekeeper.data.HousekeeperTaskType;
@Component
@RequiredArgsConstructor
public class EntityAlarmsDeletionTaskProcessor implements HousekeeperTaskProcessor {
public class EntityAlarmsDeletionTaskProcessor implements HousekeeperTaskProcessor<HousekeeperTask> {
private final AlarmService alarmService;

View File

@ -22,7 +22,7 @@ import org.thingsboard.server.dao.housekeeper.data.HousekeeperTaskType;
@Component
@RequiredArgsConstructor
public class EntityDeletionTaskProcessor implements HousekeeperTaskProcessor {
public class EntityDeletionTaskProcessor implements HousekeeperTaskProcessor<HousekeeperTask> {
@Override
public void process(HousekeeperTask task) throws Exception {

View File

@ -23,7 +23,7 @@ import org.thingsboard.server.dao.housekeeper.data.HousekeeperTaskType;
@Component
@RequiredArgsConstructor
public class EventsDeletionTaskProcessor implements HousekeeperTaskProcessor {
public class EventsDeletionTaskProcessor implements HousekeeperTaskProcessor<HousekeeperTask> {
private final EventService eventService;
@Override

View File

@ -18,9 +18,9 @@ package org.thingsboard.server.service.housekeeper.processor;
import org.thingsboard.server.dao.housekeeper.data.HousekeeperTask;
import org.thingsboard.server.dao.housekeeper.data.HousekeeperTaskType;
public interface HousekeeperTaskProcessor {
public interface HousekeeperTaskProcessor<T extends HousekeeperTask> {
void process(HousekeeperTask task) throws Exception;
void process(T task) throws Exception;
HousekeeperTaskType getTaskType();

View File

@ -23,7 +23,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
@Component
@RequiredArgsConstructor
public class TelemetryDeletionTaskProcessor implements HousekeeperTaskProcessor {
public class TelemetryDeletionTaskProcessor implements HousekeeperTaskProcessor<HousekeeperTask> {
private final TimeseriesService timeseriesService;

View File

@ -173,7 +173,7 @@ public class DefaultTbAlarmServiceTest {
User user = new User();
user.setEmail("testEmail@gmail.com");
user.setId(new UserId(UUID.randomUUID()));
service.unassignDeletedUserAlarms(new TenantId(UUID.randomUUID()), user, System.currentTimeMillis());
service.unassignDeletedUserAlarms(new TenantId(UUID.randomUUID()), user.getId(), user.getTitle(), System.currentTimeMillis());
ObjectNode commentNode = JacksonUtil.newObjectNode();
commentNode.put("subtype", "ASSIGN");

View File

@ -24,6 +24,7 @@ import org.springframework.transaction.event.TransactionalEventListener;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -78,9 +79,8 @@ public abstract class AbstractEntityService {
log.trace("[{}] DeleteEntityEvent handler: {}", tenantId, event);
cleanUpRelatedData(tenantId, entityId);
if (EntityType.USER.equals(entityId.getEntityType())) {
// housekeeperService.submitTask(HousekeeperTask.unassignAlarms(tenantId, entityId));
// unassignDeletedUserAlarms(tenantId, (User) event.getEntity(), event.getTs());
if (entityId.getEntityType() == EntityType.USER) {
housekeeperService.submitTask(HousekeeperTask.unassignAlarms((User) event.getEntity()));
}
}

View File

@ -13,17 +13,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.housekeeper;
package org.thingsboard.server.dao.housekeeper.data;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.Getter;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.TenantId;
import java.util.List;
@Getter
public class AlarmsUnassignHousekeeperTask extends HousekeeperTask {
public interface HouseKeeperService {
private final String userTitle;
private final long ts;
ListenableFuture<List<AlarmId>> unassignDeletedUserAlarms(TenantId tenantId, User user, long unassignTs);
protected AlarmsUnassignHousekeeperTask(User user) {
super(user.getTenantId(), user.getId(), HousekeeperTaskType.UNASSIGN_ALARMS);
this.userTitle = user.getTitle();
this.ts = System.currentTimeMillis();
}
}

View File

@ -15,27 +15,28 @@
*/
package org.thingsboard.server.dao.housekeeper.data;
import lombok.Data;
import lombok.Getter;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import java.io.Serializable;
/*
* on start, read the retry queue and put the messages back to main queue (save offset)
* */
@Data
@Getter
public class HousekeeperTask implements Serializable {
private final TenantId tenantId;
private final EntityId entityId;
private final HousekeeperTaskType taskType;
// maybe we should not delete relations asynchronously
// public static HousekeeperTask deleteRelations(TenantId tenantId, EntityId entityId) {
// return new HousekeeperTask(tenantId, entityId, HousekeeperTaskType.DELETE_RELATIONS);
// }
protected HousekeeperTask(TenantId tenantId, EntityId entityId, HousekeeperTaskType taskType) {
this.tenantId = tenantId;
this.entityId = entityId;
this.taskType = taskType;
}
public static HousekeeperTask deleteAttributes(TenantId tenantId, EntityId entityId) {
return new HousekeeperTask(tenantId, entityId, HousekeeperTaskType.DELETE_ATTRIBUTES);
@ -49,8 +50,8 @@ public class HousekeeperTask implements Serializable {
return new HousekeeperTask(tenantId, entityId, HousekeeperTaskType.DELETE_EVENTS);
}
public static HousekeeperTask unassignAlarms(TenantId tenantId, UserId userId) {
return new HousekeeperTask(tenantId, userId, HousekeeperTaskType.UNASSIGN_ALARMS);
public static HousekeeperTask unassignAlarms(User user) {
return new AlarmsUnassignHousekeeperTask(user);
}
public static HousekeeperTask deleteEntityAlarms(TenantId tenantId, EntityId entityId) {