TbKafkaSettings - use startswith strategy for consumer props in case direct .get return null
This commit is contained in:
		
							parent
							
								
									c443b5e8c6
								
							
						
					
					
						commit
						68289044c5
					
				@ -62,7 +62,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
 | 
			
		||||
    @Value("${sql.ttl.edge_events.edge_events_ttl:2628000}")
 | 
			
		||||
    private long ttlSeconds;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.edge.event_notifications_topic:tb_edge_event.notifications}")
 | 
			
		||||
    @Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}")
 | 
			
		||||
    private String tbEdgeEventNotificationsTopic;
 | 
			
		||||
 | 
			
		||||
    public KafkaEdgeTopicsCleanUpService(PartitionService partitionService, EdgeService edgeService,
 | 
			
		||||
 | 
			
		||||
@ -32,22 +32,22 @@ public class TopicService {
 | 
			
		||||
    @Value("${queue.prefix:}")
 | 
			
		||||
    private String prefix;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.core.notifications_topic:tb_core.notifications}")
 | 
			
		||||
    @Value("${queue.core.notifications-topic:tb_core.notifications}")
 | 
			
		||||
    private String tbCoreNotificationsTopic;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.rule-engine.notifications_topic:tb_rule_engine.notifications}")
 | 
			
		||||
    @Value("${queue.rule-engine.notifications-topic:tb_rule_engine.notifications}")
 | 
			
		||||
    private String tbRuleEngineNotificationsTopic;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.transport.notifications_topics:tb_transport.notifications}")
 | 
			
		||||
    @Value("${queue.transport.notifications-topic:tb_transport.notifications}")
 | 
			
		||||
    private String tbTransportNotificationsTopic;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.edge.notifications_topic:tb_edge.notifications}")
 | 
			
		||||
    @Value("${queue.edge.notifications-topic:tb_edge.notifications}")
 | 
			
		||||
    private String tbEdgeNotificationsTopic;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.edge.event_notifications_topic:tb_edge_event.notifications}")
 | 
			
		||||
    @Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}")
 | 
			
		||||
    private String tbEdgeEventNotificationsTopic;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.calculated-fields.notifications_topic:calculated_field.notifications}")
 | 
			
		||||
    @Value("${queue.calculated-fields.notifications-topic:calculated_field.notifications}")
 | 
			
		||||
    private String tbCalculatedFieldNotificationsTopic;
 | 
			
		||||
 | 
			
		||||
    private final ConcurrentMap<String, TopicPartitionInfo> tbCoreNotificationTopics = new ConcurrentHashMap<>();
 | 
			
		||||
 | 
			
		||||
@ -50,8 +50,6 @@ import java.util.Properties;
 | 
			
		||||
@Component
 | 
			
		||||
public class TbKafkaSettings {
 | 
			
		||||
 | 
			
		||||
    private static final List<String> DYNAMIC_TOPICS = List.of("tb_edge.notifications", "tb_edge_event.notifications");
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.bootstrap.servers}")
 | 
			
		||||
    private String servers;
 | 
			
		||||
 | 
			
		||||
@ -163,18 +161,20 @@ public class TbKafkaSettings {
 | 
			
		||||
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 | 
			
		||||
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
 | 
			
		||||
 | 
			
		||||
        consumerPropertiesPerTopic
 | 
			
		||||
                .getOrDefault(topic, Collections.emptyList())
 | 
			
		||||
                .forEach(kv -> props.put(kv.getKey(), kv.getValue()));
 | 
			
		||||
 | 
			
		||||
        if (topic != null) {
 | 
			
		||||
            DYNAMIC_TOPICS.stream()
 | 
			
		||||
                    .filter(topic::startsWith)
 | 
			
		||||
                    .findFirst()
 | 
			
		||||
                    .ifPresent(prefix -> consumerPropertiesPerTopic.getOrDefault(prefix, Collections.emptyList())
 | 
			
		||||
                            .forEach(kv -> props.put(kv.getKey(), kv.getValue())));
 | 
			
		||||
            List<TbProperty> properties = consumerPropertiesPerTopic.get(topic);
 | 
			
		||||
            if (properties == null) {
 | 
			
		||||
                for (Map.Entry<String, List<TbProperty>> entry : consumerPropertiesPerTopic.entrySet()) {
 | 
			
		||||
                    if (topic.startsWith(entry.getKey())) {
 | 
			
		||||
                        properties = entry.getValue();
 | 
			
		||||
                        break;
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            if (properties != null) {
 | 
			
		||||
                properties.forEach(kv -> props.put(kv.getKey(), kv.getValue()));
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return props;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user