diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java index ba762facb1..cdc0d3f880 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java @@ -40,7 +40,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; @TbCoreComponent @Service @@ -55,8 +54,6 @@ public class HousekeeperReprocessingService { private final ExecutorService consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("housekeeper-reprocessing-consumer")); - protected AtomicInteger cycle = new AtomicInteger(); - public HousekeeperReprocessingService(HousekeeperConfig config, @Lazy HousekeeperService housekeeperService, TbCoreQueueFactory queueFactory) { diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java index b16c1b0de9..1f5ed1be58 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java @@ -24,7 +24,6 @@ import org.thingsboard.server.common.data.housekeeper.HousekeeperTask; import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType; import org.thingsboard.server.common.data.notification.rule.trigger.TaskProcessingFailureTrigger; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; -import org.thingsboard.server.gen.transport.TransportProtos.HousekeeperTaskProto; import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -118,15 +117,19 @@ public class HousekeeperService { throw new IllegalArgumentException("Unsupported task type " + taskType); } - if (log.isDebugEnabled()) { - log.debug("[{}] {} {}", task.getTenantId(), isNew(msg.getTask()) ? "Processing" : "Reprocessing", task.getDescription()); - } + try { + long startTs = System.currentTimeMillis(); Future future = taskExecutor.submit(() -> { taskProcessor.process((T) task); return null; }); future.get(config.getTaskProcessingTimeout(), TimeUnit.MILLISECONDS); + + long timing = System.currentTimeMillis() - startTs; + if (log.isDebugEnabled()) { + log.debug("[{}] Processed {} in {} ms (attempt {})", task.getTenantId(), task.getDescription(), timing, msg.getTask().getAttempt()); + } statsService.ifPresent(statsService -> statsService.reportProcessed(taskType, msg)); } catch (InterruptedException e) { throw e; @@ -137,14 +140,13 @@ public class HousekeeperService { } else if (e instanceof TimeoutException) { error = new TimeoutException("Timeout after " + config.getTaskProcessingTimeout() + " seconds"); } - log.error("[{}][{}][{}] {} task processing failed, submitting for reprocessing (attempt {}): {}", - task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), - taskType, msg.getTask().getAttempt(), task, error); if (msg.getTask().getAttempt() < config.getMaxReprocessingAttempts()) { + log.warn("[{}] Failed to process {} (attempt {}), submitting for reprocessing", + task.getTenantId(), task.getDescription(), msg.getTask().getAttempt(), error); reprocessingService.submitForReprocessing(msg, error); } else { - log.error("Failed to process task in {} attempts: {}", msg.getTask().getAttempt(), msg); + log.error("[{}] Failed to process task in {} attempts: {}", task.getTenantId(), msg.getTask().getAttempt(), msg, e); notificationRuleProcessor.process(TaskProcessingFailureTrigger.builder() .task(task) .error(error) @@ -155,12 +157,8 @@ public class HousekeeperService { } } - private boolean isNew(HousekeeperTaskProto task) { - return task.getErrorsCount() == 0; - } - @PreDestroy - private void stop() throws Exception { + private void stop() { consumer.stop(); consumerExecutor.shutdownNow(); log.info("Stopped Housekeeper service"); diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/stats/HousekeeperStatsService.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/stats/HousekeeperStatsService.java index 4944cf9f43..da586aa820 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/stats/HousekeeperStatsService.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/stats/HousekeeperStatsService.java @@ -69,6 +69,7 @@ public class HousekeeperStatsService { } public void reportProcessed(HousekeeperTaskType taskType, ToHousekeeperServiceMsg msg) { + // todo: report timings HousekeeperStats stats = this.stats.get(taskType); if (msg.getTask().getErrorsCount() == 0) { stats.getProcessedCounter().increment(); diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index 5dd9e4b77f..ee239d12db 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -70,6 +70,7 @@ import org.thingsboard.server.actors.device.DeviceActorMessageProcessor; import org.thingsboard.server.actors.device.SessionInfo; import org.thingsboard.server.actors.device.ToDeviceRpcRequestMetadata; import org.thingsboard.server.actors.service.DefaultActorService; +import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; @@ -102,7 +103,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.id.UUIDBased; import org.thingsboard.server.common.data.id.UserId; -import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; @@ -116,6 +117,7 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.config.ThingsboardSecurityConfiguration; import org.thingsboard.server.dao.Dao; +import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.device.ClaimDevicesService; import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.timeseries.TimeseriesService; @@ -231,7 +233,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { private TbTenantProfileService tbTenantProfileService; @Autowired - public TimeseriesService tsService; + protected TimeseriesService tsService; + + @Autowired + protected AttributesService attributesService; @Autowired protected DefaultActorService actorService; @@ -379,7 +384,7 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { } } Awaitility.await("tenant cleanup finish").atMost(30, TimeUnit.SECONDS) - .until(() -> tsService.findLatest(TenantId.SYS_TENANT_ID, tenantId, "test").get().isEmpty()); + .until(() -> attributesService.find(TenantId.SYS_TENANT_ID, tenantId, AttributeScope.SERVER_SCOPE, "test").get().isEmpty()); } private List getAllTenants() throws Exception { @@ -444,7 +449,7 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { protected Tenant createTenant(Tenant tenant) throws Exception { tenant = doPost("/api/tenant", tenant, Tenant.class); - tsService.save(TenantId.SYS_TENANT_ID, tenant.getId(), new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry("test", "test"))).get(); // creating marker ts to later know when Housekeeper finishes tenant cleanup + attributesService.save(TenantId.SYS_TENANT_ID, tenant.getId(), AttributeScope.SERVER_SCOPE, new BaseAttributeKvEntry(System.currentTimeMillis(), new StringDataEntry("test", "test"))).get(); // creating marker attr to later know when Housekeeper finishes tenant cleanup return tenant; } diff --git a/application/src/test/java/org/thingsboard/server/service/housekeeper/HousekeeperServiceTest.java b/application/src/test/java/org/thingsboard/server/service/housekeeper/HousekeeperServiceTest.java index a1015c4a4c..e66d3ed86a 100644 --- a/application/src/test/java/org/thingsboard/server/service/housekeeper/HousekeeperServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/housekeeper/HousekeeperServiceTest.java @@ -19,7 +19,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.TextNode; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; @@ -64,7 +63,6 @@ import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.data.rule.RuleNode; -import org.thingsboard.server.common.msg.housekeeper.HousekeeperClient; import org.thingsboard.server.controller.AbstractControllerTest; import org.thingsboard.server.dao.alarm.AlarmDao; import org.thingsboard.server.dao.alarm.AlarmService; @@ -113,8 +111,6 @@ public class HousekeeperServiceTest extends AbstractControllerTest { @SpyBean private HousekeeperService housekeeperService; @SpyBean - private HousekeeperClient housekeeperClient; - @SpyBean private HousekeeperReprocessingService housekeeperReprocessingService; @Autowired private EventService eventService; @@ -236,14 +232,17 @@ public class HousekeeperServiceTest extends AbstractControllerTest { List devices = new ArrayList<>(); for (int i = 1; i <= 300; i++) { Device device = createDevice("test" + i, "test" + i); - createRelatedData(device.getId()); devices.add(device.getId()); } + DeviceId firstDevice = devices.get(0); + createRelatedData(firstDevice); + DeviceId lastDevice = devices.get(devices.size() - 1); + createRelatedData(lastDevice); Asset asset = createAsset(); createRelatedData(asset.getId()); - createRelation(devices.get(0), asset.getId()); - createAlarm(devices.get(0), asset.getId()); + createRelation(firstDevice, asset.getId()); + createAlarm(firstDevice, asset.getId()); RuleChainMetaData ruleChainMetaData = createRuleChain(); RuleChainId ruleChainId = ruleChainMetaData.getRuleChainId(); @@ -261,7 +260,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest { loginSysAdmin(); deleteDifferentTenant(); - await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> { + await().atMost(60, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> { for (DeviceId deviceId : devices) { verifyNoRelatedData(deviceId); } @@ -277,7 +276,6 @@ public class HousekeeperServiceTest extends AbstractControllerTest { } @Test - @Ignore // FIXME !!! public void whenTaskProcessingFails_thenReprocess() throws Exception { TimeoutException error = new TimeoutException("Test timeout"); doThrow(error).when(tsHistoryDeletionTaskProcessor).process(any()); @@ -289,12 +287,10 @@ public class HousekeeperServiceTest extends AbstractControllerTest { int attempts = 2; await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> { - verifyTaskProcessing(device.getId(), HousekeeperTaskType.DELETE_TS_HISTORY, 0); - for (int i = 1; i <= attempts; i++) { + for (int i = 0; i <= attempts; i++) { int attempt = i; verify(housekeeperReprocessingService).submitForReprocessing(argThat(getTaskMatcher(device.getId(), HousekeeperTaskType.DELETE_TS_HISTORY, - task -> task.getErrorsCount() > 0 && task.getAttempt() == attempt)), argThat(e -> e.getMessage().equals(error.getMessage()))); - verifyTaskProcessing(device.getId(), HousekeeperTaskType.DELETE_TS_HISTORY, attempt); + task -> task.getAttempt() == attempt)), argThat(e -> e.getMessage().equals(error.getMessage()))); } }); @@ -306,8 +302,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest { } @Test - @Ignore // FIXME !!! - public void whenReprocessingAttemptsExceeded_thenReprocessOnNextStartUp() throws Exception { + public void whenReprocessingAttemptsExceeded_thenDropTheTask() throws Exception { TimeoutException error = new TimeoutException("Test timeout"); doThrow(error).when(tsHistoryDeletionTaskProcessor).process(any()); @@ -327,20 +322,10 @@ public class HousekeeperServiceTest extends AbstractControllerTest { doCallRealMethod().when(tsHistoryDeletionTaskProcessor).process(any()); TimeUnit.SECONDS.sleep(2); verify(housekeeperService, never()).processTask(argThat(getTaskMatcher(device.getId(), HousekeeperTaskType.DELETE_TS_HISTORY, null))); - - housekeeperReprocessingService.cycle.set(0); // imitating start-up - await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { - verifyTaskProcessing(device.getId(), HousekeeperTaskType.DELETE_TS_HISTORY, 6); - }); } private void verifyTaskProcessing(EntityId entityId, HousekeeperTaskType taskType, int expectedAttempt) throws Exception { - try { - verify(housekeeperService).processTask(argThat(getTaskMatcher(entityId, taskType, task -> task.getAttempt() == expectedAttempt))); - } catch (Throwable e) { - e.printStackTrace(); - throw e; - } + verify(housekeeperService).processTask(argThat(getTaskMatcher(entityId, taskType, task -> task.getAttempt() == expectedAttempt))); } private ArgumentMatcher getTaskMatcher(EntityId entityId, HousekeeperTaskType taskType, diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/AlarmsDeletionHousekeeperTask.java b/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/AlarmsDeletionHousekeeperTask.java index 9bd8f29bb4..60a50916ef 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/AlarmsDeletionHousekeeperTask.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/AlarmsDeletionHousekeeperTask.java @@ -43,4 +43,9 @@ public class AlarmsDeletionHousekeeperTask extends HousekeeperTask { this.alarms = alarms; } + @Override + public String getDescription() { + return super.getDescription() + (alarms != null ? " (" + alarms + ")" : ""); + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/EntitiesDeletionHousekeeperTask.java b/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/EntitiesDeletionHousekeeperTask.java index e29c3d3c51..d90fa6de94 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/EntitiesDeletionHousekeeperTask.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/EntitiesDeletionHousekeeperTask.java @@ -49,7 +49,7 @@ public class EntitiesDeletionHousekeeperTask extends HousekeeperTask { @JsonIgnore @Override public String getDescription() { - return entityType.getNormalName().toLowerCase() + "s deletion"; + return entityType.getNormalName().toLowerCase() + "s deletion" + (entities != null ? " (" + entities + ")" : ""); } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/LatestTsDeletionHousekeeperTask.java b/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/LatestTsDeletionHousekeeperTask.java index cef43756e0..4f0f56479e 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/LatestTsDeletionHousekeeperTask.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/LatestTsDeletionHousekeeperTask.java @@ -36,4 +36,9 @@ public class LatestTsDeletionHousekeeperTask extends HousekeeperTask { this.key = key; } + @Override + public String getDescription() { + return super.getDescription() + (key != null ? " for key '" + key + "'" : ""); + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/TsHistoryDeletionHousekeeperTask.java b/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/TsHistoryDeletionHousekeeperTask.java index 80b8edc345..8f462f2dff 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/TsHistoryDeletionHousekeeperTask.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/housekeeper/TsHistoryDeletionHousekeeperTask.java @@ -36,4 +36,9 @@ public class TsHistoryDeletionHousekeeperTask extends HousekeeperTask { this.key = key; } + @Override + public String getDescription() { + return super.getDescription() + (key != null ? " for key '" + key + "'" : ""); + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/TaskProcessingFailureTrigger.java b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/TaskProcessingFailureTrigger.java index 9ccfac6aff..3626e09c0b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/TaskProcessingFailureTrigger.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/TaskProcessingFailureTrigger.java @@ -22,8 +22,6 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType; -import java.util.concurrent.TimeUnit; - @Data @Builder public class TaskProcessingFailureTrigger implements NotificationRuleTrigger { @@ -49,17 +47,7 @@ public class TaskProcessingFailureTrigger implements NotificationRuleTrigger { @Override public boolean deduplicate() { - return true; - } - - @Override - public String getDeduplicationKey() { - return String.join(":", NotificationRuleTrigger.super.getDeduplicationKey(), task.getTaskType().toString()); - } - - @Override - public long getDefaultDeduplicationDuration() { - return TimeUnit.MINUTES.toMillis(30); + return false; } }