Merge pull request #12877 from AndriiLandiak/fix-kafka-edge-cleanup
Fix KafkaEdgeTopicsCleanUpService
This commit is contained in:
commit
0aaccc7f4f
@ -53,8 +53,6 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAS
|
|||||||
@ConditionalOnExpression("'${queue.type:null}'=='kafka' && ${edges.enabled:true} && ${sql.ttl.edge_events.edge_events_ttl:0} > 0")
|
@ConditionalOnExpression("'${queue.type:null}'=='kafka' && ${edges.enabled:true} && ${sql.ttl.edge_events.edge_events_ttl:0} > 0")
|
||||||
public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
|
public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
|
||||||
|
|
||||||
private static final String EDGE_EVENT_TOPIC_NAME = "tb_edge_event.notifications.";
|
|
||||||
|
|
||||||
private final TopicService topicService;
|
private final TopicService topicService;
|
||||||
private final TenantService tenantService;
|
private final TenantService tenantService;
|
||||||
private final EdgeService edgeService;
|
private final EdgeService edgeService;
|
||||||
@ -64,6 +62,9 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
|
|||||||
@Value("${sql.ttl.edge_events.edge_events_ttl:2628000}")
|
@Value("${sql.ttl.edge_events.edge_events_ttl:2628000}")
|
||||||
private long ttlSeconds;
|
private long ttlSeconds;
|
||||||
|
|
||||||
|
@Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}")
|
||||||
|
private String tbEdgeEventNotificationsTopic;
|
||||||
|
|
||||||
public KafkaEdgeTopicsCleanUpService(PartitionService partitionService, EdgeService edgeService,
|
public KafkaEdgeTopicsCleanUpService(PartitionService partitionService, EdgeService edgeService,
|
||||||
TenantService tenantService, AttributesService attributesService,
|
TenantService tenantService, AttributesService attributesService,
|
||||||
TopicService topicService, TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs kafkaTopicConfigs) {
|
TopicService topicService, TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs kafkaTopicConfigs) {
|
||||||
@ -86,7 +87,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
String edgeTopicPrefix = topicService.buildTopicName(EDGE_EVENT_TOPIC_NAME);
|
String edgeTopicPrefix = topicService.buildTopicName(tbEdgeEventNotificationsTopic);
|
||||||
List<String> matchingTopics = topics.stream().filter(topic -> topic.startsWith(edgeTopicPrefix)).toList();
|
List<String> matchingTopics = topics.stream().filter(topic -> topic.startsWith(edgeTopicPrefix)).toList();
|
||||||
if (matchingTopics.isEmpty()) {
|
if (matchingTopics.isEmpty()) {
|
||||||
log.debug("No matching topics found with prefix [{}]. Skipping cleanup.", edgeTopicPrefix);
|
log.debug("No matching topics found with prefix [{}]. Skipping cleanup.", edgeTopicPrefix);
|
||||||
@ -147,7 +148,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
|
|||||||
try {
|
try {
|
||||||
String remaining = topic.substring(prefix.length());
|
String remaining = topic.substring(prefix.length());
|
||||||
String[] parts = remaining.split("\\.");
|
String[] parts = remaining.split("\\.");
|
||||||
TenantId tenantId = new TenantId(UUID.fromString(parts[0]));
|
TenantId tenantId = TenantId.fromUUID(UUID.fromString(parts[0]));
|
||||||
EdgeId edgeId = new EdgeId(UUID.fromString(parts[1]));
|
EdgeId edgeId = new EdgeId(UUID.fromString(parts[1]));
|
||||||
tenantEdgeMap.computeIfAbsent(tenantId, id -> new ArrayList<>()).add(edgeId);
|
tenantEdgeMap.computeIfAbsent(tenantId, id -> new ArrayList<>()).add(edgeId);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user