From 1099ca2aa2457dcbdebdb991c3fa9315c8f0acdf Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 29 Nov 2024 15:45:25 +0200 Subject: [PATCH] Improvement --- .../ttl/KafkaEdgeTopicsCleanUpService.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java index d801dc2bbe..b28664e3be 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java @@ -43,7 +43,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_CONNECT_TIME; @@ -60,9 +59,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { private final TenantService tenantService; private final EdgeService edgeService; private final AttributesService attributesService; - - private final TbKafkaSettings kafkaSettings; - private final TbKafkaTopicConfigs kafkaTopicConfigs; + private final TbKafkaAdmin kafkaAdmin; @Value("${sql.ttl.edge_events.edge_events_ttl:2628000}") private long ttlSeconds; @@ -75,8 +72,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { this.tenantService = tenantService; this.edgeService = edgeService; this.attributesService = attributesService; - this.kafkaSettings = kafkaSettings; - this.kafkaTopicConfigs = kafkaTopicConfigs; + this.kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); } @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.edge_events.execution_interval_ms})}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}") @@ -85,7 +81,6 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { return; } - TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); Set topics = kafkaAdmin.getAllTopics(); if (topics == null || topics.isEmpty()) { return; @@ -103,10 +98,10 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { long currentTimeMillis = System.currentTimeMillis(); long ttlMillis = TimeUnit.SECONDS.toMillis(ttlSeconds); - tenantEdgeMap.forEach((tenantId, edgeIds) -> processTenantCleanUp(kafkaAdmin, tenantId, edgeIds, ttlMillis, currentTimeMillis)); + tenantEdgeMap.forEach((tenantId, edgeIds) -> processTenantCleanUp(tenantId, edgeIds, ttlMillis, currentTimeMillis)); } - private void processTenantCleanUp(TbKafkaAdmin kafkaAdmin, TenantId tenantId, List edgeIds, long ttlMillis, long currentTimeMillis) { + private void processTenantCleanUp(TenantId tenantId, List edgeIds, long ttlMillis, long currentTimeMillis) { boolean tenantExists = tenantService.tenantExists(tenantId); if (tenantExists) { for (EdgeId edgeId : edgeIds) { @@ -129,8 +124,8 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { log.info("[{}] Removed topic {} for deleted edge {}", tenantId, topic, edgeId); } }); - } catch (InterruptedException | ExecutionException e) { - log.error("[{}] Failed to delete topic", tenantId); + } catch (Exception e) { + log.error("[{}] Failed to delete topic for edge {}", tenantId, edgeId, e); } } } else {