diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java index c32ed92e31..73712542fd 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java @@ -62,7 +62,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { @Value("${sql.ttl.edge_events.edge_events_ttl:2628000}") private long ttlSeconds; - @Value("${queue.edge.event_notifications_topic:tb_edge_event.notifications}") + @Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}") private String tbEdgeEventNotificationsTopic; public KafkaEdgeTopicsCleanUpService(PartitionService partitionService, EdgeService edgeService, diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index 4b6c0aa139..7271e1cb10 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -32,22 +32,22 @@ public class TopicService { @Value("${queue.prefix:}") private String prefix; - @Value("${queue.core.notifications_topic:tb_core.notifications}") + @Value("${queue.core.notifications-topic:tb_core.notifications}") private String tbCoreNotificationsTopic; - @Value("${queue.rule-engine.notifications_topic:tb_rule_engine.notifications}") + @Value("${queue.rule-engine.notifications-topic:tb_rule_engine.notifications}") private String tbRuleEngineNotificationsTopic; - @Value("${queue.transport.notifications_topics:tb_transport.notifications}") + @Value("${queue.transport.notifications-topic:tb_transport.notifications}") private String tbTransportNotificationsTopic; - @Value("${queue.edge.notifications_topic:tb_edge.notifications}") + @Value("${queue.edge.notifications-topic:tb_edge.notifications}") private String tbEdgeNotificationsTopic; - @Value("${queue.edge.event_notifications_topic:tb_edge_event.notifications}") + @Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}") private String tbEdgeEventNotificationsTopic; - @Value("${queue.calculated-fields.notifications_topic:calculated_field.notifications}") + @Value("${queue.calculated-fields.notifications-topic:calculated_field.notifications}") private String tbCalculatedFieldNotificationsTopic; private final ConcurrentMap tbCoreNotificationTopics = new ConcurrentHashMap<>(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index d447d92cb3..82b5af179f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -50,8 +50,6 @@ import java.util.Properties; @Component public class TbKafkaSettings { - private static final List DYNAMIC_TOPICS = List.of("tb_edge.notifications", "tb_edge_event.notifications"); - @Value("${queue.kafka.bootstrap.servers}") private String servers; @@ -163,18 +161,20 @@ public class TbKafkaSettings { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerPropertiesPerTopic - .getOrDefault(topic, Collections.emptyList()) - .forEach(kv -> props.put(kv.getKey(), kv.getValue())); - if (topic != null) { - DYNAMIC_TOPICS.stream() - .filter(topic::startsWith) - .findFirst() - .ifPresent(prefix -> consumerPropertiesPerTopic.getOrDefault(prefix, Collections.emptyList()) - .forEach(kv -> props.put(kv.getKey(), kv.getValue()))); + List properties = consumerPropertiesPerTopic.get(topic); + if (properties == null) { + for (Map.Entry> entry : consumerPropertiesPerTopic.entrySet()) { + if (topic.startsWith(entry.getKey())) { + properties = entry.getValue(); + break; + } + } + } + if (properties != null) { + properties.forEach(kv -> props.put(kv.getKey(), kv.getValue())); + } } - return props; }