diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java index c37496ff49..5c577dd39f 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java @@ -45,6 +45,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; @TbCoreComponent @Service @@ -70,6 +71,7 @@ public class HousekeeperReprocessingService { private final ExecutorService consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("housekeeper-reprocessing-consumer")); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("housekeeper-reprocessing-scheduler")); + protected AtomicInteger cycle = new AtomicInteger(); private boolean stopped; public HousekeeperReprocessingService(@Lazy DefaultHousekeeperService housekeeperService, @@ -86,6 +88,7 @@ public class HousekeeperReprocessingService { private void init() { scheduler.scheduleWithFixedDelay(() -> { try { + cycle.incrementAndGet(); startReprocessing(); } catch (Throwable e) { log.error("Unexpected error during reprocessing", e); @@ -109,8 +112,9 @@ public class HousekeeperReprocessingService { if (msgs.isEmpty() || msgs.stream().anyMatch(msg -> msg.getValue().getTask().getTs() >= startTs)) { // it's not time yet to process the message if (!consumer.isCommitSupported()) { + // resubmitting consumed messages if committing is not supported (for in-memory queue) for (TbProtoQueueMsg msg : msgs) { - producer.send(submitTpi, new TbProtoQueueMsg<>(msg.getKey(), msg.getValue()), null); + submit(msg.getKey(), msg.getValue()); } } break; @@ -119,7 +123,7 @@ public class HousekeeperReprocessingService { for (TbProtoQueueMsg msg : msgs) { log.trace("Reprocessing task: {}", msg); try { - housekeeperService.processTask(msg.getValue()); + reprocessTask(msg.getValue()); } catch (InterruptedException e) { return; } catch (Throwable e) { @@ -144,7 +148,26 @@ public class HousekeeperReprocessingService { }); } - // todo: dead letter queue if attempts count exceeds the configured maximum + private void reprocessTask(ToHousekeeperServiceMsg msg) throws Exception { + int attempt = msg.getTask().getAttempt(); + if (attempt > maxReprocessingAttempts) { + if (cycle.get() == 1) { // only reprocessing tasks with exceeded failures on first cycle (after start-up) + log.info("Trying to reprocess task with {} failed attempts: {}", attempt, msg); + } else { + // resubmitting msg to be processed on the next service start + msg = msg.toBuilder() + .setTask(msg.getTask().toBuilder() + .setTs(getReprocessingTs()) + .build()) + .build(); + submit(UUID.randomUUID(), msg); + return; + } + } + + housekeeperService.processTask(msg); + } + public void submitForReprocessing(ToHousekeeperServiceMsg msg, Throwable error) { HousekeeperTaskProto task = msg.getTask(); @@ -155,12 +178,24 @@ public class HousekeeperReprocessingService { .setTask(task.toBuilder() .setAttempt(attempt) .clearErrors().addAllErrors(errors) - .setTs(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis((long) (reprocessingDelay * 0.8))) + .setTs(getReprocessingTs()) .build()) .build(); log.trace("Submitting for reprocessing: {}", msg); - producer.send(submitTpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null); // reprocessing topic has single partition, so we don't care about the msg key + submit(UUID.randomUUID(), msg); // reprocessing topic has single partition, so we don't care about the msg key + + if (task.getAttempt() >= maxReprocessingAttempts) { + log.warn("Failed to process task in {} attempts: {}", task.getAttempt(), msg); + } + } + + private void submit(UUID key, ToHousekeeperServiceMsg msg) { + producer.send(submitTpi, new TbProtoQueueMsg<>(key, msg), null); + } + + private long getReprocessingTs() { + return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis((long) (reprocessingDelay * 0.8)); // *0.8 so that msgs submitted just after finishing reprocessing are processed on the next cycle } @PreDestroy diff --git a/application/src/test/java/org/thingsboard/server/service/housekeeper/HousekeeperServiceTest.java b/application/src/test/java/org/thingsboard/server/service/housekeeper/HousekeeperServiceTest.java index a0564d3bcb..ef3ce8c786 100644 --- a/application/src/test/java/org/thingsboard/server/service/housekeeper/HousekeeperServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/housekeeper/HousekeeperServiceTest.java @@ -21,6 +21,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.test.context.TestPropertySource; @@ -36,6 +37,7 @@ import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.common.data.event.LifecycleEvent; import org.thingsboard.server.common.data.housekeeper.HousekeeperTask; import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType; +import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; @@ -81,6 +83,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @@ -90,7 +93,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. "transport.http.enabled=true", "queue.core.housekeeper.reprocessing-start-delay-sec=1", "queue.core.housekeeper.task-reprocessing-delay-sec=2", - "queue.core.housekeeper.poll-interval-ms=1000" + "queue.core.housekeeper.poll-interval-ms=1000", + "queue.core.housekeeper.max-reprocessing-attempts=5" }) public class HousekeeperServiceTest extends AbstractControllerTest { @@ -170,7 +174,8 @@ public class HousekeeperServiceTest extends AbstractControllerTest { .severity(AlarmSeverity.MAJOR) .build(); alarm = doPost("/api/alarm", alarm, Alarm.class); - alarm = doPost("/api/alarm/" + alarm.getId() + "/assign/" + userId, "", Alarm.class); + AlarmId alarmId = alarm.getId(); + alarm = doPost("/api/alarm/" + alarmId + "/assign/" + userId, "", Alarm.class); assertThat(alarm.getAssigneeId()).isEqualTo(userId); assertThat(alarmService.findAlarmIdsByAssigneeId(tenantId, userId, new PageLink(100)).getData()).isNotEmpty(); @@ -178,8 +183,8 @@ public class HousekeeperServiceTest extends AbstractControllerTest { await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { verifyNoRelatedData(userId); + assertThat(alarmService.findAlarmById(tenantId, alarmId).getAssigneeId()).isNull(); }); - assertThat(alarmService.findAlarmById(tenantId, alarm.getId()).getAssigneeId()).isNull(); } @Test @@ -228,15 +233,12 @@ public class HousekeeperServiceTest extends AbstractControllerTest { int attempts = 3; await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { - verify(housekeeperService).processTask(argThat(verifyTaskSubmission(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY, - task -> task.getErrorsCount() == 0))); - + verifyTaskProcessing(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY, 0); for (int i = 1; i <= attempts; i++) { int attempt = i; - verify(housekeeperReprocessingService).submitForReprocessing(argThat(verifyTaskSubmission(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY, + verify(housekeeperReprocessingService).submitForReprocessing(argThat(getTaskMatcher(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY, task -> task.getErrorsCount() > 0 && task.getAttempt() == attempt)), argThat(e -> e.getMessage().equals(error.getMessage()))); - verify(housekeeperService).processTask(argThat(verifyTaskSubmission(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY, - task -> task.getErrorsCount() > 0 && task.getAttempt() == attempt))); + verifyTaskProcessing(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY, attempt); } }); @@ -247,11 +249,43 @@ public class HousekeeperServiceTest extends AbstractControllerTest { }); } - private ArgumentMatcher verifyTaskSubmission(EntityId entityId, HousekeeperTaskType taskType, - Predicate additionalCheck) { + @Test + public void whenReprocessingAttemptsExceeded_thenReprocessOnNextStartUp() throws Exception { + TimeoutException error = new TimeoutException("Test timeout"); + doThrow(error).when(telemetryDeletionTaskProcessor).process(any()); + + Device device = createDevice("woeifjiowejf", "woeifjiowejf"); + createRelatedData(device.getId()); + + doDelete("/api/device/" + device.getId()).andExpect(status().isOk()); + + int maxAttempts = 5; + await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + for (int i = 1; i <= maxAttempts; i++) { + verifyTaskProcessing(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY, i); + } + }); + + Mockito.clearInvocations(housekeeperService); + doCallRealMethod().when(telemetryDeletionTaskProcessor).process(any()); + TimeUnit.SECONDS.sleep(2); + verify(housekeeperService, never()).processTask(argThat(getTaskMatcher(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY, null))); + + housekeeperReprocessingService.cycle.set(0); // imitating start-up + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + verifyTaskProcessing(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY, 6); + }); + } + + private void verifyTaskProcessing(EntityId entityId, HousekeeperTaskType taskType, int expectedAttempt) throws Exception { + verify(housekeeperService).processTask(argThat(getTaskMatcher(entityId, taskType, task -> task.getAttempt() == expectedAttempt))); + } + + private ArgumentMatcher getTaskMatcher(EntityId entityId, HousekeeperTaskType taskType, + Predicate additionalCheck) { return msg -> { HousekeeperTask task = JacksonUtil.fromString(msg.getTask().getValue(), HousekeeperTask.class); - return task.getEntityId().equals(entityId) && task.getTaskType() == taskType && additionalCheck.test(msg.getTask()); + return task.getEntityId().equals(entityId) && task.getTaskType() == taskType && (additionalCheck == null || additionalCheck.test(msg.getTask())); }; }