Max reprocessing attempts for Housekeeper task

This commit is contained in:
ViacheslavKlimov 2024-03-13 18:34:27 +02:00
parent 20ea8c7fc2
commit 82476b629c
2 changed files with 86 additions and 17 deletions

View File

@ -45,6 +45,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@TbCoreComponent
@Service
@ -70,6 +71,7 @@ public class HousekeeperReprocessingService {
private final ExecutorService consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("housekeeper-reprocessing-consumer"));
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("housekeeper-reprocessing-scheduler"));
protected AtomicInteger cycle = new AtomicInteger();
private boolean stopped;
public HousekeeperReprocessingService(@Lazy DefaultHousekeeperService housekeeperService,
@ -86,6 +88,7 @@ public class HousekeeperReprocessingService {
private void init() {
scheduler.scheduleWithFixedDelay(() -> {
try {
cycle.incrementAndGet();
startReprocessing();
} catch (Throwable e) {
log.error("Unexpected error during reprocessing", e);
@ -109,8 +112,9 @@ public class HousekeeperReprocessingService {
if (msgs.isEmpty() || msgs.stream().anyMatch(msg -> msg.getValue().getTask().getTs() >= startTs)) {
// it's not time yet to process the message
if (!consumer.isCommitSupported()) {
// resubmitting consumed messages if committing is not supported (for in-memory queue)
for (TbProtoQueueMsg<ToHousekeeperServiceMsg> msg : msgs) {
producer.send(submitTpi, new TbProtoQueueMsg<>(msg.getKey(), msg.getValue()), null);
submit(msg.getKey(), msg.getValue());
}
}
break;
@ -119,7 +123,7 @@ public class HousekeeperReprocessingService {
for (TbProtoQueueMsg<ToHousekeeperServiceMsg> msg : msgs) {
log.trace("Reprocessing task: {}", msg);
try {
housekeeperService.processTask(msg.getValue());
reprocessTask(msg.getValue());
} catch (InterruptedException e) {
return;
} catch (Throwable e) {
@ -144,7 +148,26 @@ public class HousekeeperReprocessingService {
});
}
// todo: dead letter queue if attempts count exceeds the configured maximum
private void reprocessTask(ToHousekeeperServiceMsg msg) throws Exception {
int attempt = msg.getTask().getAttempt();
if (attempt > maxReprocessingAttempts) {
if (cycle.get() == 1) { // only reprocessing tasks with exceeded failures on first cycle (after start-up)
log.info("Trying to reprocess task with {} failed attempts: {}", attempt, msg);
} else {
// resubmitting msg to be processed on the next service start
msg = msg.toBuilder()
.setTask(msg.getTask().toBuilder()
.setTs(getReprocessingTs())
.build())
.build();
submit(UUID.randomUUID(), msg);
return;
}
}
housekeeperService.processTask(msg);
}
public void submitForReprocessing(ToHousekeeperServiceMsg msg, Throwable error) {
HousekeeperTaskProto task = msg.getTask();
@ -155,12 +178,24 @@ public class HousekeeperReprocessingService {
.setTask(task.toBuilder()
.setAttempt(attempt)
.clearErrors().addAllErrors(errors)
.setTs(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis((long) (reprocessingDelay * 0.8)))
.setTs(getReprocessingTs())
.build())
.build();
log.trace("Submitting for reprocessing: {}", msg);
producer.send(submitTpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null); // reprocessing topic has single partition, so we don't care about the msg key
submit(UUID.randomUUID(), msg); // reprocessing topic has single partition, so we don't care about the msg key
if (task.getAttempt() >= maxReprocessingAttempts) {
log.warn("Failed to process task in {} attempts: {}", task.getAttempt(), msg);
}
}
private void submit(UUID key, ToHousekeeperServiceMsg msg) {
producer.send(submitTpi, new TbProtoQueueMsg<>(key, msg), null);
}
private long getReprocessingTs() {
return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis((long) (reprocessingDelay * 0.8)); // *0.8 so that msgs submitted just after finishing reprocessing are processed on the next cycle
}
@PreDestroy

View File

@ -21,6 +21,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.TestPropertySource;
@ -36,6 +37,7 @@ import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.common.data.event.LifecycleEvent;
import org.thingsboard.server.common.data.housekeeper.HousekeeperTask;
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
@ -81,6 +83,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@ -90,7 +93,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
"transport.http.enabled=true",
"queue.core.housekeeper.reprocessing-start-delay-sec=1",
"queue.core.housekeeper.task-reprocessing-delay-sec=2",
"queue.core.housekeeper.poll-interval-ms=1000"
"queue.core.housekeeper.poll-interval-ms=1000",
"queue.core.housekeeper.max-reprocessing-attempts=5"
})
public class HousekeeperServiceTest extends AbstractControllerTest {
@ -170,7 +174,8 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
.severity(AlarmSeverity.MAJOR)
.build();
alarm = doPost("/api/alarm", alarm, Alarm.class);
alarm = doPost("/api/alarm/" + alarm.getId() + "/assign/" + userId, "", Alarm.class);
AlarmId alarmId = alarm.getId();
alarm = doPost("/api/alarm/" + alarmId + "/assign/" + userId, "", Alarm.class);
assertThat(alarm.getAssigneeId()).isEqualTo(userId);
assertThat(alarmService.findAlarmIdsByAssigneeId(tenantId, userId, new PageLink(100)).getData()).isNotEmpty();
@ -178,8 +183,8 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
verifyNoRelatedData(userId);
assertThat(alarmService.findAlarmById(tenantId, alarmId).getAssigneeId()).isNull();
});
assertThat(alarmService.findAlarmById(tenantId, alarm.getId()).getAssigneeId()).isNull();
}
@Test
@ -228,15 +233,12 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
int attempts = 3;
await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
verify(housekeeperService).processTask(argThat(verifyTaskSubmission(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY,
task -> task.getErrorsCount() == 0)));
verifyTaskProcessing(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY, 0);
for (int i = 1; i <= attempts; i++) {
int attempt = i;
verify(housekeeperReprocessingService).submitForReprocessing(argThat(verifyTaskSubmission(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY,
verify(housekeeperReprocessingService).submitForReprocessing(argThat(getTaskMatcher(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY,
task -> task.getErrorsCount() > 0 && task.getAttempt() == attempt)), argThat(e -> e.getMessage().equals(error.getMessage())));
verify(housekeeperService).processTask(argThat(verifyTaskSubmission(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY,
task -> task.getErrorsCount() > 0 && task.getAttempt() == attempt)));
verifyTaskProcessing(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY, attempt);
}
});
@ -247,11 +249,43 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
});
}
private ArgumentMatcher<ToHousekeeperServiceMsg> verifyTaskSubmission(EntityId entityId, HousekeeperTaskType taskType,
Predicate<HousekeeperTaskProto> additionalCheck) {
@Test
public void whenReprocessingAttemptsExceeded_thenReprocessOnNextStartUp() throws Exception {
TimeoutException error = new TimeoutException("Test timeout");
doThrow(error).when(telemetryDeletionTaskProcessor).process(any());
Device device = createDevice("woeifjiowejf", "woeifjiowejf");
createRelatedData(device.getId());
doDelete("/api/device/" + device.getId()).andExpect(status().isOk());
int maxAttempts = 5;
await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
for (int i = 1; i <= maxAttempts; i++) {
verifyTaskProcessing(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY, i);
}
});
Mockito.clearInvocations(housekeeperService);
doCallRealMethod().when(telemetryDeletionTaskProcessor).process(any());
TimeUnit.SECONDS.sleep(2);
verify(housekeeperService, never()).processTask(argThat(getTaskMatcher(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY, null)));
housekeeperReprocessingService.cycle.set(0); // imitating start-up
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
verifyTaskProcessing(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY, 6);
});
}
private void verifyTaskProcessing(EntityId entityId, HousekeeperTaskType taskType, int expectedAttempt) throws Exception {
verify(housekeeperService).processTask(argThat(getTaskMatcher(entityId, taskType, task -> task.getAttempt() == expectedAttempt)));
}
private ArgumentMatcher<ToHousekeeperServiceMsg> getTaskMatcher(EntityId entityId, HousekeeperTaskType taskType,
Predicate<HousekeeperTaskProto> additionalCheck) {
return msg -> {
HousekeeperTask task = JacksonUtil.fromString(msg.getTask().getValue(), HousekeeperTask.class);
return task.getEntityId().equals(entityId) && task.getTaskType() == taskType && additionalCheck.test(msg.getTask());
return task.getEntityId().equals(entityId) && task.getTaskType() == taskType && (additionalCheck == null || additionalCheck.test(msg.getTask()));
};
}