From e1923d0ce0f208ca9cdfa205ff611ba58d3c1181 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Tue, 11 Mar 2025 10:43:34 +0200 Subject: [PATCH] Improve KafkaEdgeTopicsCleanUpService --- .../service/ttl/KafkaEdgeTopicsCleanUpService.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java index 5a74f9f69a..73712542fd 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java @@ -53,8 +53,6 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAS @ConditionalOnExpression("'${queue.type:null}'=='kafka' && ${edges.enabled:true} && ${sql.ttl.edge_events.edge_events_ttl:0} > 0") public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { - private static final String EDGE_EVENT_TOPIC_NAME = "tb_edge_event.notifications."; - private final TopicService topicService; private final TenantService tenantService; private final EdgeService edgeService; @@ -64,6 +62,9 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { @Value("${sql.ttl.edge_events.edge_events_ttl:2628000}") private long ttlSeconds; + @Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}") + private String tbEdgeEventNotificationsTopic; + public KafkaEdgeTopicsCleanUpService(PartitionService partitionService, EdgeService edgeService, TenantService tenantService, AttributesService attributesService, TopicService topicService, TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs kafkaTopicConfigs) { @@ -86,7 +87,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { return; } - String edgeTopicPrefix = topicService.buildTopicName(EDGE_EVENT_TOPIC_NAME); + String edgeTopicPrefix = topicService.buildTopicName(tbEdgeEventNotificationsTopic); List matchingTopics = topics.stream().filter(topic -> topic.startsWith(edgeTopicPrefix)).toList(); if (matchingTopics.isEmpty()) { log.debug("No matching topics found with prefix [{}]. Skipping cleanup.", edgeTopicPrefix); @@ -147,7 +148,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { try { String remaining = topic.substring(prefix.length()); String[] parts = remaining.split("\\."); - TenantId tenantId = new TenantId(UUID.fromString(parts[0])); + TenantId tenantId = TenantId.fromUUID(UUID.fromString(parts[0])); EdgeId edgeId = new EdgeId(UUID.fromString(parts[1])); tenantEdgeMap.computeIfAbsent(tenantId, id -> new ArrayList<>()).add(edgeId); } catch (Exception e) {