Housekeeper refactoring
This commit is contained in:
		
							parent
							
								
									881a8048ce
								
							
						
					
					
						commit
						6f9e9305bd
					
				@ -40,7 +40,6 @@ import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.ExecutorService;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
 | 
			
		||||
@TbCoreComponent
 | 
			
		||||
@Service
 | 
			
		||||
@ -55,8 +54,6 @@ public class HousekeeperReprocessingService {
 | 
			
		||||
 | 
			
		||||
    private final ExecutorService consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("housekeeper-reprocessing-consumer"));
 | 
			
		||||
 | 
			
		||||
    protected AtomicInteger cycle = new AtomicInteger();
 | 
			
		||||
 | 
			
		||||
    public HousekeeperReprocessingService(HousekeeperConfig config,
 | 
			
		||||
                                          @Lazy HousekeeperService housekeeperService,
 | 
			
		||||
                                          TbCoreQueueFactory queueFactory) {
 | 
			
		||||
 | 
			
		||||
@ -24,7 +24,6 @@ import org.thingsboard.server.common.data.housekeeper.HousekeeperTask;
 | 
			
		||||
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.rule.trigger.TaskProcessingFailureTrigger;
 | 
			
		||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.HousekeeperTaskProto;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
 | 
			
		||||
@ -118,15 +117,19 @@ public class HousekeeperService {
 | 
			
		||||
            throw new IllegalArgumentException("Unsupported task type " + taskType);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (log.isDebugEnabled()) {
 | 
			
		||||
            log.debug("[{}] {} {}", task.getTenantId(), isNew(msg.getTask()) ? "Processing" : "Reprocessing", task.getDescription());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            long startTs = System.currentTimeMillis();
 | 
			
		||||
            Future<Object> future = taskExecutor.submit(() -> {
 | 
			
		||||
                taskProcessor.process((T) task);
 | 
			
		||||
                return null;
 | 
			
		||||
            });
 | 
			
		||||
            future.get(config.getTaskProcessingTimeout(), TimeUnit.MILLISECONDS);
 | 
			
		||||
 | 
			
		||||
            long timing = System.currentTimeMillis() - startTs;
 | 
			
		||||
            if (log.isDebugEnabled()) {
 | 
			
		||||
                log.debug("[{}] Processed {} in {} ms (attempt {})", task.getTenantId(), task.getDescription(), timing, msg.getTask().getAttempt());
 | 
			
		||||
            }
 | 
			
		||||
            statsService.ifPresent(statsService -> statsService.reportProcessed(taskType, msg));
 | 
			
		||||
        } catch (InterruptedException e) {
 | 
			
		||||
            throw e;
 | 
			
		||||
@ -137,14 +140,13 @@ public class HousekeeperService {
 | 
			
		||||
            } else if (e instanceof TimeoutException) {
 | 
			
		||||
                error = new TimeoutException("Timeout after " + config.getTaskProcessingTimeout() + " seconds");
 | 
			
		||||
            }
 | 
			
		||||
            log.error("[{}][{}][{}] {} task processing failed, submitting for reprocessing (attempt {}): {}",
 | 
			
		||||
                    task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(),
 | 
			
		||||
                    taskType, msg.getTask().getAttempt(), task, error);
 | 
			
		||||
 | 
			
		||||
            if (msg.getTask().getAttempt() < config.getMaxReprocessingAttempts()) {
 | 
			
		||||
                log.warn("[{}] Failed to process {} (attempt {}), submitting for reprocessing",
 | 
			
		||||
                        task.getTenantId(), task.getDescription(), msg.getTask().getAttempt(), error);
 | 
			
		||||
                reprocessingService.submitForReprocessing(msg, error);
 | 
			
		||||
            } else {
 | 
			
		||||
                log.error("Failed to process task in {} attempts: {}", msg.getTask().getAttempt(), msg);
 | 
			
		||||
                log.error("[{}] Failed to process task in {} attempts: {}", task.getTenantId(), msg.getTask().getAttempt(), msg, e);
 | 
			
		||||
                notificationRuleProcessor.process(TaskProcessingFailureTrigger.builder()
 | 
			
		||||
                        .task(task)
 | 
			
		||||
                        .error(error)
 | 
			
		||||
@ -155,12 +157,8 @@ public class HousekeeperService {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean isNew(HousekeeperTaskProto task) {
 | 
			
		||||
        return task.getErrorsCount() == 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
    private void stop() throws Exception {
 | 
			
		||||
    private void stop() {
 | 
			
		||||
        consumer.stop();
 | 
			
		||||
        consumerExecutor.shutdownNow();
 | 
			
		||||
        log.info("Stopped Housekeeper service");
 | 
			
		||||
 | 
			
		||||
@ -69,6 +69,7 @@ public class HousekeeperStatsService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void reportProcessed(HousekeeperTaskType taskType, ToHousekeeperServiceMsg msg) {
 | 
			
		||||
        // todo: report timings
 | 
			
		||||
        HousekeeperStats stats = this.stats.get(taskType);
 | 
			
		||||
        if (msg.getTask().getErrorsCount() == 0) {
 | 
			
		||||
            stats.getProcessedCounter().increment();
 | 
			
		||||
 | 
			
		||||
@ -70,6 +70,7 @@ import org.thingsboard.server.actors.device.DeviceActorMessageProcessor;
 | 
			
		||||
import org.thingsboard.server.actors.device.SessionInfo;
 | 
			
		||||
import org.thingsboard.server.actors.device.ToDeviceRpcRequestMetadata;
 | 
			
		||||
import org.thingsboard.server.actors.service.DefaultActorService;
 | 
			
		||||
import org.thingsboard.server.common.data.AttributeScope;
 | 
			
		||||
import org.thingsboard.server.common.data.Customer;
 | 
			
		||||
import org.thingsboard.server.common.data.Device;
 | 
			
		||||
import org.thingsboard.server.common.data.DeviceProfile;
 | 
			
		||||
@ -102,7 +103,7 @@ import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantProfileId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.UUIDBased;
 | 
			
		||||
import org.thingsboard.server.common.data.id.UserId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
@ -116,6 +117,7 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
 | 
			
		||||
import org.thingsboard.server.common.msg.session.FeatureType;
 | 
			
		||||
import org.thingsboard.server.config.ThingsboardSecurityConfiguration;
 | 
			
		||||
import org.thingsboard.server.dao.Dao;
 | 
			
		||||
import org.thingsboard.server.dao.attributes.AttributesService;
 | 
			
		||||
import org.thingsboard.server.dao.device.ClaimDevicesService;
 | 
			
		||||
import org.thingsboard.server.dao.tenant.TenantProfileService;
 | 
			
		||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
 | 
			
		||||
@ -231,7 +233,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
 | 
			
		||||
    private TbTenantProfileService tbTenantProfileService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    public TimeseriesService tsService;
 | 
			
		||||
    protected TimeseriesService tsService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    protected AttributesService attributesService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    protected DefaultActorService actorService;
 | 
			
		||||
@ -379,7 +384,7 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        Awaitility.await("tenant cleanup finish").atMost(30, TimeUnit.SECONDS)
 | 
			
		||||
                .until(() -> tsService.findLatest(TenantId.SYS_TENANT_ID, tenantId, "test").get().isEmpty());
 | 
			
		||||
                .until(() -> attributesService.find(TenantId.SYS_TENANT_ID, tenantId, AttributeScope.SERVER_SCOPE, "test").get().isEmpty());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private List<Tenant> getAllTenants() throws Exception {
 | 
			
		||||
@ -444,7 +449,7 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
 | 
			
		||||
 | 
			
		||||
    protected Tenant createTenant(Tenant tenant) throws Exception {
 | 
			
		||||
        tenant = doPost("/api/tenant", tenant, Tenant.class);
 | 
			
		||||
        tsService.save(TenantId.SYS_TENANT_ID, tenant.getId(), new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry("test", "test"))).get(); // creating marker ts to later know when Housekeeper finishes tenant cleanup
 | 
			
		||||
        attributesService.save(TenantId.SYS_TENANT_ID, tenant.getId(), AttributeScope.SERVER_SCOPE, new BaseAttributeKvEntry(System.currentTimeMillis(), new StringDataEntry("test", "test"))).get(); // creating marker attr to later know when Housekeeper finishes tenant cleanup
 | 
			
		||||
        return tenant;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -19,7 +19,6 @@ import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.fasterxml.jackson.databind.node.TextNode;
 | 
			
		||||
import org.junit.After;
 | 
			
		||||
import org.junit.Before;
 | 
			
		||||
import org.junit.Ignore;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.mockito.ArgumentMatcher;
 | 
			
		||||
import org.mockito.Mockito;
 | 
			
		||||
@ -64,7 +63,6 @@ import org.thingsboard.server.common.data.rule.RuleChain;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleChainType;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleNode;
 | 
			
		||||
import org.thingsboard.server.common.msg.housekeeper.HousekeeperClient;
 | 
			
		||||
import org.thingsboard.server.controller.AbstractControllerTest;
 | 
			
		||||
import org.thingsboard.server.dao.alarm.AlarmDao;
 | 
			
		||||
import org.thingsboard.server.dao.alarm.AlarmService;
 | 
			
		||||
@ -113,8 +111,6 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
 | 
			
		||||
    @SpyBean
 | 
			
		||||
    private HousekeeperService housekeeperService;
 | 
			
		||||
    @SpyBean
 | 
			
		||||
    private HousekeeperClient housekeeperClient;
 | 
			
		||||
    @SpyBean
 | 
			
		||||
    private HousekeeperReprocessingService housekeeperReprocessingService;
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private EventService eventService;
 | 
			
		||||
@ -236,14 +232,17 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
 | 
			
		||||
        List<DeviceId> devices = new ArrayList<>();
 | 
			
		||||
        for (int i = 1; i <= 300; i++) {
 | 
			
		||||
            Device device = createDevice("test" + i, "test" + i);
 | 
			
		||||
            createRelatedData(device.getId());
 | 
			
		||||
            devices.add(device.getId());
 | 
			
		||||
        }
 | 
			
		||||
        DeviceId firstDevice = devices.get(0);
 | 
			
		||||
        createRelatedData(firstDevice);
 | 
			
		||||
        DeviceId lastDevice = devices.get(devices.size() - 1);
 | 
			
		||||
        createRelatedData(lastDevice);
 | 
			
		||||
 | 
			
		||||
        Asset asset = createAsset();
 | 
			
		||||
        createRelatedData(asset.getId());
 | 
			
		||||
        createRelation(devices.get(0), asset.getId());
 | 
			
		||||
        createAlarm(devices.get(0), asset.getId());
 | 
			
		||||
        createRelation(firstDevice, asset.getId());
 | 
			
		||||
        createAlarm(firstDevice, asset.getId());
 | 
			
		||||
 | 
			
		||||
        RuleChainMetaData ruleChainMetaData = createRuleChain();
 | 
			
		||||
        RuleChainId ruleChainId = ruleChainMetaData.getRuleChainId();
 | 
			
		||||
@ -261,7 +260,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
 | 
			
		||||
        loginSysAdmin();
 | 
			
		||||
        deleteDifferentTenant();
 | 
			
		||||
 | 
			
		||||
        await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
        await().atMost(60, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
            for (DeviceId deviceId : devices) {
 | 
			
		||||
                verifyNoRelatedData(deviceId);
 | 
			
		||||
            }
 | 
			
		||||
@ -277,7 +276,6 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    @Ignore // FIXME !!!
 | 
			
		||||
    public void whenTaskProcessingFails_thenReprocess() throws Exception {
 | 
			
		||||
        TimeoutException error = new TimeoutException("Test timeout");
 | 
			
		||||
        doThrow(error).when(tsHistoryDeletionTaskProcessor).process(any());
 | 
			
		||||
@ -289,12 +287,10 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
 | 
			
		||||
 | 
			
		||||
        int attempts = 2;
 | 
			
		||||
        await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
            verifyTaskProcessing(device.getId(), HousekeeperTaskType.DELETE_TS_HISTORY, 0);
 | 
			
		||||
            for (int i = 1; i <= attempts; i++) {
 | 
			
		||||
            for (int i = 0; i <= attempts; i++) {
 | 
			
		||||
                int attempt = i;
 | 
			
		||||
                verify(housekeeperReprocessingService).submitForReprocessing(argThat(getTaskMatcher(device.getId(), HousekeeperTaskType.DELETE_TS_HISTORY,
 | 
			
		||||
                        task -> task.getErrorsCount() > 0 && task.getAttempt() == attempt)), argThat(e -> e.getMessage().equals(error.getMessage())));
 | 
			
		||||
                verifyTaskProcessing(device.getId(), HousekeeperTaskType.DELETE_TS_HISTORY, attempt);
 | 
			
		||||
                        task -> task.getAttempt() == attempt)), argThat(e -> e.getMessage().equals(error.getMessage())));
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
@ -306,8 +302,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    @Ignore // FIXME !!!
 | 
			
		||||
    public void whenReprocessingAttemptsExceeded_thenReprocessOnNextStartUp() throws Exception {
 | 
			
		||||
    public void whenReprocessingAttemptsExceeded_thenDropTheTask() throws Exception {
 | 
			
		||||
        TimeoutException error = new TimeoutException("Test timeout");
 | 
			
		||||
        doThrow(error).when(tsHistoryDeletionTaskProcessor).process(any());
 | 
			
		||||
 | 
			
		||||
@ -327,20 +322,10 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
 | 
			
		||||
        doCallRealMethod().when(tsHistoryDeletionTaskProcessor).process(any());
 | 
			
		||||
        TimeUnit.SECONDS.sleep(2);
 | 
			
		||||
        verify(housekeeperService, never()).processTask(argThat(getTaskMatcher(device.getId(), HousekeeperTaskType.DELETE_TS_HISTORY, null)));
 | 
			
		||||
 | 
			
		||||
        housekeeperReprocessingService.cycle.set(0); // imitating start-up
 | 
			
		||||
        await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
 | 
			
		||||
            verifyTaskProcessing(device.getId(), HousekeeperTaskType.DELETE_TS_HISTORY, 6);
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void verifyTaskProcessing(EntityId entityId, HousekeeperTaskType taskType, int expectedAttempt) throws Exception {
 | 
			
		||||
        try {
 | 
			
		||||
            verify(housekeeperService).processTask(argThat(getTaskMatcher(entityId, taskType, task -> task.getAttempt() == expectedAttempt)));
 | 
			
		||||
        } catch (Throwable e) {
 | 
			
		||||
            e.printStackTrace();
 | 
			
		||||
            throw e;
 | 
			
		||||
        }
 | 
			
		||||
        verify(housekeeperService).processTask(argThat(getTaskMatcher(entityId, taskType, task -> task.getAttempt() == expectedAttempt)));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ArgumentMatcher<ToHousekeeperServiceMsg> getTaskMatcher(EntityId entityId, HousekeeperTaskType taskType,
 | 
			
		||||
 | 
			
		||||
@ -43,4 +43,9 @@ public class AlarmsDeletionHousekeeperTask extends HousekeeperTask {
 | 
			
		||||
        this.alarms = alarms;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String getDescription() {
 | 
			
		||||
        return super.getDescription() + (alarms != null ? " (" + alarms + ")" : "");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -49,7 +49,7 @@ public class EntitiesDeletionHousekeeperTask extends HousekeeperTask {
 | 
			
		||||
    @JsonIgnore
 | 
			
		||||
    @Override
 | 
			
		||||
    public String getDescription() {
 | 
			
		||||
        return entityType.getNormalName().toLowerCase() + "s deletion";
 | 
			
		||||
        return entityType.getNormalName().toLowerCase() + "s deletion" + (entities != null ? " (" + entities + ")" : "");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -36,4 +36,9 @@ public class LatestTsDeletionHousekeeperTask extends HousekeeperTask {
 | 
			
		||||
        this.key = key;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String getDescription() {
 | 
			
		||||
        return super.getDescription() + (key != null ? " for key '" + key + "'" : "");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -36,4 +36,9 @@ public class TsHistoryDeletionHousekeeperTask extends HousekeeperTask {
 | 
			
		||||
        this.key = key;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String getDescription() {
 | 
			
		||||
        return super.getDescription() + (key != null ? " for key '" + key + "'" : "");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -22,8 +22,6 @@ import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
@Builder
 | 
			
		||||
public class TaskProcessingFailureTrigger implements NotificationRuleTrigger {
 | 
			
		||||
@ -49,17 +47,7 @@ public class TaskProcessingFailureTrigger implements NotificationRuleTrigger {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean deduplicate() {
 | 
			
		||||
        return true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String getDeduplicationKey() {
 | 
			
		||||
        return String.join(":", NotificationRuleTrigger.super.getDeduplicationKey(), task.getTaskType().toString());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public long getDefaultDeduplicationDuration() {
 | 
			
		||||
        return TimeUnit.MINUTES.toMillis(30);
 | 
			
		||||
        return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user