Housekeeper: tests and refactoring
This commit is contained in:
parent
b82bceceb4
commit
0d3c3d6b25
@ -17,6 +17,7 @@ package org.thingsboard.server.service.housekeeper;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
@ -53,6 +54,7 @@ import java.util.stream.Collectors;
|
||||
@TbCoreComponent
|
||||
@Service
|
||||
@Slf4j
|
||||
@ConditionalOnProperty(name = "queue.core.housekeeper.enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class DefaultHousekeeperService implements HousekeeperService {
|
||||
|
||||
private final Map<HousekeeperTaskType, HousekeeperTaskProcessor<?>> taskProcessors;
|
||||
@ -136,9 +138,7 @@ public class DefaultHousekeeperService implements HousekeeperService {
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[{}] {} task {}", task.getTenantId(),
|
||||
msg.getTask().getErrorsCount() == 0 ? "Processing" : "Reprocessing",
|
||||
msg.getTask().getValue());
|
||||
log.debug("[{}] {} task {}", task.getTenantId(), isNew(msg.getTask()) ? "Processing" : "Reprocessing", msg.getTask().getValue());
|
||||
}
|
||||
try {
|
||||
Future<Object> future = executor.submit(() -> {
|
||||
@ -146,7 +146,9 @@ public class DefaultHousekeeperService implements HousekeeperService {
|
||||
return null;
|
||||
});
|
||||
future.get(taskProcessingTimeout, TimeUnit.MILLISECONDS);
|
||||
|
||||
statsService.reportProcessed(task.getTaskType(), msg);
|
||||
log.debug("[{}] Successfully {} task {}", task.getTenantId(), isNew(msg.getTask()) ? "processed" : "reprocessed", msg.getTask().getValue());
|
||||
} catch (InterruptedException e) {
|
||||
throw e;
|
||||
} catch (Throwable e) {
|
||||
@ -160,6 +162,7 @@ public class DefaultHousekeeperService implements HousekeeperService {
|
||||
task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(),
|
||||
task.getTaskType(), msg.getTask().getAttempt(), task, error);
|
||||
reprocessingService.submitForReprocessing(msg, error);
|
||||
|
||||
statsService.reportFailure(task.getTaskType(), msg);
|
||||
notificationRuleProcessor.process(TaskProcessingFailureTrigger.builder()
|
||||
.task(task)
|
||||
@ -185,12 +188,19 @@ public class DefaultHousekeeperService implements HousekeeperService {
|
||||
.build()), null);
|
||||
}
|
||||
|
||||
private boolean isNew(HousekeeperTaskProto task) {
|
||||
return task.getErrorsCount() == 0;
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
private void stop() {
|
||||
log.info("Stopped Housekeeper service");
|
||||
private void stop() throws Exception {
|
||||
stopped = true;
|
||||
consumer.unsubscribe();
|
||||
consumerExecutor.shutdown();
|
||||
if (!consumerExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
|
||||
consumerExecutor.shutdownNow();
|
||||
}
|
||||
log.info("Stopped Housekeeper service");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.service.housekeeper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
@ -27,6 +28,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.HousekeeperTaskProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
|
||||
import org.thingsboard.server.queue.TbQueueProducer;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
|
||||
@ -47,16 +49,18 @@ import java.util.concurrent.TimeUnit;
|
||||
@TbCoreComponent
|
||||
@Service
|
||||
@Slf4j
|
||||
@ConditionalOnProperty(name = "queue.core.housekeeper.enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class HousekeeperReprocessingService {
|
||||
|
||||
private final DefaultHousekeeperService housekeeperService;
|
||||
private final PartitionService partitionService;
|
||||
private final TbCoreQueueFactory queueFactory;
|
||||
private final TbQueueProducerProvider producerProvider;
|
||||
private final TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> producer;
|
||||
private final TopicPartitionInfo submitTpi;
|
||||
|
||||
@Value("${queue.core.housekeeper.reprocessing-start-delay-sec:15}") // fixme: to 5 minutes
|
||||
@Value("${queue.core.housekeeper.reprocessing-start-delay-sec:300}")
|
||||
private int startDelay;
|
||||
@Value("${queue.core.housekeeper.task-reprocessing-delay-sec:30}") // fixme: to 30 minutes or 1 hour
|
||||
@Value("${queue.core.housekeeper.task-reprocessing-delay-sec:3600}")
|
||||
private int reprocessingDelay;
|
||||
@Value("${queue.core.housekeeper.max-reprocessing-attempts:10}")
|
||||
private int maxReprocessingAttempts;
|
||||
@ -74,7 +78,8 @@ public class HousekeeperReprocessingService {
|
||||
this.housekeeperService = housekeeperService;
|
||||
this.partitionService = partitionService;
|
||||
this.queueFactory = queueFactory;
|
||||
this.producerProvider = producerProvider;
|
||||
this.producer = producerProvider.getHousekeeperReprocessingMsgProducer();
|
||||
this.submitTpi = TopicPartitionInfo.builder().topic(producer.getDefaultTopic()).build();
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
@ -101,7 +106,13 @@ public class HousekeeperReprocessingService {
|
||||
while (!stopped) {
|
||||
try {
|
||||
List<TbProtoQueueMsg<ToHousekeeperServiceMsg>> msgs = consumer.poll(pollInterval);
|
||||
if (msgs.isEmpty() || msgs.stream().anyMatch(msg -> msg.getValue().getTask().getTs() >= startTs)) { // msg batch size should be 1. otherwise some tasks won't be reprocessed immediately
|
||||
if (msgs.isEmpty() || msgs.stream().anyMatch(msg -> msg.getValue().getTask().getTs() >= startTs)) {
|
||||
// it's not time yet to process the message
|
||||
if (!consumer.isCommitSupported()) {
|
||||
for (TbProtoQueueMsg<ToHousekeeperServiceMsg> msg : msgs) {
|
||||
producer.send(submitTpi, new TbProtoQueueMsg<>(msg.getKey(), msg.getValue()), null);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
@ -114,7 +125,6 @@ public class HousekeeperReprocessingService {
|
||||
} catch (Throwable e) {
|
||||
log.error("Unexpected error during message reprocessing [{}]", msg, e);
|
||||
submitForReprocessing(msg.getValue(), e);
|
||||
// fixme: msgs are duplicated
|
||||
}
|
||||
}
|
||||
consumer.commit();
|
||||
@ -150,16 +160,18 @@ public class HousekeeperReprocessingService {
|
||||
.build();
|
||||
|
||||
log.trace("Submitting for reprocessing: {}", msg);
|
||||
var producer = producerProvider.getHousekeeperReprocessingMsgProducer();
|
||||
TopicPartitionInfo tpi = TopicPartitionInfo.builder().topic(producer.getDefaultTopic()).build();
|
||||
producer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null); // reprocessing topic has single partition, so we don't care about the msg key
|
||||
producer.send(submitTpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null); // reprocessing topic has single partition, so we don't care about the msg key
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
private void stop() {
|
||||
private void stop() throws Exception {
|
||||
stopped = true;
|
||||
scheduler.shutdownNow();
|
||||
consumerExecutor.shutdown();
|
||||
if (!consumerExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
|
||||
consumerExecutor.shutdownNow();
|
||||
}
|
||||
log.info("Stopped Housekeeper reprocessing service");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1427,8 +1427,10 @@ queue:
|
||||
ota-updates: "${TB_QUEUE_KAFKA_OTA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
|
||||
# Kafka properties for Version Control topic
|
||||
version-control: "${TB_QUEUE_KAFKA_VC_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
|
||||
# Kafka properties for Housekeeper tasks topic
|
||||
housekeeper: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1"
|
||||
housekeeper-reprocessing: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1"
|
||||
# Kafka properties for Housekeeper reprocessing topic; retention.ms is set to 90 days
|
||||
housekeeper-reprocessing: "retention.ms:7776000000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1"
|
||||
consumer-stats:
|
||||
# Prints lag between consumer group offset and last messages offset in Kafka topics
|
||||
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
|
||||
@ -1585,12 +1587,13 @@ queue:
|
||||
# Statistics printing interval for Core microservices
|
||||
print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:60000}"
|
||||
housekeeper:
|
||||
enabled: "${TB_HOUSEKEEPER_ENABLED:true}"
|
||||
topic: "${TB_HOUSEKEEPER_TOPIC:tb_housekeeper}"
|
||||
reprocessing-topic: "${TB_HOUSEKEEPER_REPROCESSING_TOPIC:tb_housekeeper.reprocessing}"
|
||||
poll-interval-ms: "${TB_HOUSEKEEPER_POLL_INTERVAL_MS:500}"
|
||||
task-processing-timeout-ms: "${TB_HOUSEKEEPER_TASK_PROCESSING_TIMEOUT_MS:120000}"
|
||||
reprocessing-start-delay-sec: "${TB_HOUSEKEEPER_REPROCESSING_START_DELAY_SEC:15}" # fixme: to 5 minutes
|
||||
task-reprocessing-delay-sec: "${TB_HOUSEKEEPER_TASK_REPROCESSING_DELAY_SEC:30}" # fixme: to 30 minutes or 1 hour
|
||||
reprocessing-start-delay-sec: "${TB_HOUSEKEEPER_REPROCESSING_START_DELAY_SEC:300}"
|
||||
task-reprocessing-delay-sec: "${TB_HOUSEKEEPER_TASK_REPROCESSING_DELAY_SEC:3600}"
|
||||
max-reprocessing-attempts: "${TB_HOUSEKEEPER_MAX_REPROCESSING_ATTEMPTS:30}"
|
||||
|
||||
vc:
|
||||
|
||||
@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.node.TextNode;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.mock.mockito.SpyBean;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
@ -33,6 +34,7 @@ import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
|
||||
import org.thingsboard.server.common.data.event.EventType;
|
||||
import org.thingsboard.server.common.data.event.LifecycleEvent;
|
||||
import org.thingsboard.server.common.data.housekeeper.HousekeeperTask;
|
||||
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
@ -56,33 +58,46 @@ import org.thingsboard.server.controller.AbstractControllerTest;
|
||||
import org.thingsboard.server.dao.alarm.AlarmService;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
import org.thingsboard.server.dao.event.EventService;
|
||||
import org.thingsboard.server.dao.housekeeper.HousekeeperService;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.HousekeeperTaskProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
|
||||
import org.thingsboard.server.service.housekeeper.processor.TelemetryDeletionTaskProcessor;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.Mockito.doCallRealMethod;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
||||
|
||||
@DaoSqlTest
|
||||
@TestPropertySource(properties = {
|
||||
"transport.http.enabled=true"
|
||||
"queue.core.housekeeper.enabled=true",
|
||||
"transport.http.enabled=true",
|
||||
"queue.core.housekeeper.reprocessing-start-delay-sec=1",
|
||||
"queue.core.housekeeper.task-reprocessing-delay-sec=2",
|
||||
"queue.core.housekeeper.poll-interval-ms=1000"
|
||||
})
|
||||
public class HousekeeperServiceTest extends AbstractControllerTest {
|
||||
|
||||
@SpyBean
|
||||
private HousekeeperService housekeeperService;
|
||||
private DefaultHousekeeperService housekeeperService;
|
||||
@SpyBean
|
||||
private HousekeeperReprocessingService housekeeperReprocessingService;
|
||||
@Autowired
|
||||
private EventService eventService;
|
||||
@Autowired
|
||||
@ -93,6 +108,8 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
|
||||
private RuleChainService ruleChainService;
|
||||
@Autowired
|
||||
private AlarmService alarmService;
|
||||
@SpyBean
|
||||
private TelemetryDeletionTaskProcessor telemetryDeletionTaskProcessor;
|
||||
|
||||
private TenantId tenantId;
|
||||
|
||||
@ -112,7 +129,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
|
||||
|
||||
@Test
|
||||
public void whenDeviceIsDeleted_thenCleanUpRelatedData() throws Exception {
|
||||
Device device = createDevice("test", "test");
|
||||
Device device = createDevice("wekfwepf", "wekfwepf");
|
||||
createRelatedData(device.getId());
|
||||
|
||||
doDelete("/api/device/" + device.getId()).andExpect(status().isOk());
|
||||
@ -143,7 +160,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
|
||||
|
||||
@Test
|
||||
public void whenUserIsDeleted_thenCleanUpRelatedData() throws Exception {
|
||||
Device device = createDevice("test", "test");
|
||||
Device device = createDevice("vneoruvhwe", "vneoruvhwe");
|
||||
UserId userId = customerUserId;
|
||||
createRelatedData(userId);
|
||||
Alarm alarm = Alarm.builder()
|
||||
@ -172,7 +189,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
|
||||
|
||||
createRelatedData(tenantId);
|
||||
|
||||
Device device = createDevice("test", "test");
|
||||
Device device = createDevice("oi324rujoi", "oi324rujoi");
|
||||
createRelatedData(device.getId());
|
||||
|
||||
RuleChainMetaData ruleChainMetaData = createRuleChain();
|
||||
@ -199,6 +216,45 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenTaskProcessingFails_thenReprocessUntilSuccessful() throws Exception {
|
||||
TimeoutException error = new TimeoutException("Test timeout");
|
||||
doThrow(error).when(telemetryDeletionTaskProcessor).process(any());
|
||||
|
||||
Device device = createDevice("vep9ruv32", "vep9ruv32");
|
||||
createRelatedData(device.getId());
|
||||
|
||||
doDelete("/api/device/" + device.getId()).andExpect(status().isOk());
|
||||
|
||||
int attempts = 3;
|
||||
await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||
verify(housekeeperService).processTask(argThat(verifyTaskSubmission(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY,
|
||||
task -> task.getErrorsCount() == 0)));
|
||||
|
||||
for (int i = 1; i <= attempts; i++) {
|
||||
int attempt = i;
|
||||
verify(housekeeperReprocessingService).submitForReprocessing(argThat(verifyTaskSubmission(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY,
|
||||
task -> task.getErrorsCount() > 0 && task.getAttempt() == attempt)), argThat(e -> e.getMessage().equals(error.getMessage())));
|
||||
verify(housekeeperService).processTask(argThat(verifyTaskSubmission(device.getId(), HousekeeperTaskType.DELETE_TELEMETRY,
|
||||
task -> task.getErrorsCount() > 0 && task.getAttempt() == attempt)));
|
||||
}
|
||||
});
|
||||
|
||||
assertThat(getTimeseriesHistory(device.getId())).isNotEmpty();
|
||||
doCallRealMethod().when(telemetryDeletionTaskProcessor).process(any()); // fixing the code
|
||||
await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||
assertThat(getTimeseriesHistory(device.getId())).isEmpty();
|
||||
});
|
||||
}
|
||||
|
||||
private ArgumentMatcher<ToHousekeeperServiceMsg> verifyTaskSubmission(EntityId entityId, HousekeeperTaskType taskType,
|
||||
Predicate<HousekeeperTaskProto> additionalCheck) {
|
||||
return msg -> {
|
||||
HousekeeperTask task = JacksonUtil.fromString(msg.getTask().getValue(), HousekeeperTask.class);
|
||||
return task.getEntityId().equals(entityId) && task.getTaskType() == taskType && additionalCheck.test(msg.getTask());
|
||||
};
|
||||
}
|
||||
|
||||
private void createRelatedData(EntityId entityId) throws Exception {
|
||||
createTelemetry(entityId);
|
||||
for (String scope : List.of(DataConstants.SERVER_SCOPE, DataConstants.SHARED_SCOPE, DataConstants.CLIENT_SCOPE)) {
|
||||
|
||||
@ -54,3 +54,4 @@ sql.edge_events.partition_size=168
|
||||
sql.ttl.edge_events.edge_event_ttl=2592000
|
||||
|
||||
server.log_controller_error_stack_trace=false
|
||||
queue.core.housekeeper.enabled=false
|
||||
|
||||
@ -36,6 +36,8 @@ public interface TbQueueConsumer<T extends TbQueueMsg> {
|
||||
|
||||
void commit();
|
||||
|
||||
boolean isCommitSupported();
|
||||
|
||||
boolean isStopped();
|
||||
|
||||
List<String> getFullTopicNames();
|
||||
|
||||
@ -201,6 +201,11 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
|
||||
return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCommitSupported() {
|
||||
return true;
|
||||
}
|
||||
|
||||
protected boolean isLongPollingSupported() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -114,4 +114,9 @@ public class InMemoryTbQueueConsumer<T extends TbQueueMsg> implements TbQueueCon
|
||||
return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCommitSupported() {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -58,7 +58,7 @@ public class CleanUpService {
|
||||
}
|
||||
|
||||
public void cleanUpRelatedData(TenantId tenantId, EntityId entityId) {
|
||||
log.trace("[{}][{}][{}] Cleaning up related data", tenantId, entityId.getEntityType(), entityId.getId());
|
||||
log.debug("[{}][{}][{}] Cleaning up related data", tenantId, entityId.getEntityType(), entityId.getId());
|
||||
// todo: skipped entities list
|
||||
relationService.deleteEntityRelations(tenantId, entityId);
|
||||
housekeeperService.ifPresent(housekeeperService -> {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user