added consumerPropertiesPerTopic to transport yml
This commit is contained in:
		
							parent
							
								
									2658de715a
								
							
						
					
					
						commit
						b82525ae86
					
				@ -19,6 +19,7 @@ import lombok.Getter;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PostConstruct;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
@ -37,7 +38,7 @@ public class TbKafkaTopicConfigs {
 | 
			
		||||
    private String notificationsProperties;
 | 
			
		||||
    @Value("${queue.kafka.topic-properties.js-executor}")
 | 
			
		||||
    private String jsExecutorProperties;
 | 
			
		||||
    @Value("${queue.kafka.topic-properties.fw-updates}")
 | 
			
		||||
    @Value("${queue.kafka.topic-properties.fw-updates:}")
 | 
			
		||||
    private String fwUpdatesProperties;
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
@ -65,11 +66,13 @@ public class TbKafkaTopicConfigs {
 | 
			
		||||
 | 
			
		||||
    private Map<String, String> getConfigs(String properties) {
 | 
			
		||||
        Map<String, String> configs = new HashMap<>();
 | 
			
		||||
        for (String property : properties.split(";")) {
 | 
			
		||||
            int delimiterPosition = property.indexOf(":");
 | 
			
		||||
            String key = property.substring(0, delimiterPosition);
 | 
			
		||||
            String value = property.substring(delimiterPosition + 1);
 | 
			
		||||
            configs.put(key, value);
 | 
			
		||||
        if (StringUtils.isNotEmpty(properties)) {
 | 
			
		||||
            for (String property : properties.split(";")) {
 | 
			
		||||
                int delimiterPosition = property.indexOf(":");
 | 
			
		||||
                String key = property.substring(0, delimiterPosition);
 | 
			
		||||
                String value = property.substring(delimiterPosition + 1);
 | 
			
		||||
                configs.put(key, value);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return configs;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -136,6 +136,10 @@ queue:
 | 
			
		||||
      sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
 | 
			
		||||
      sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
 | 
			
		||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
			
		||||
    consumerPropertiesPerTopic:
 | 
			
		||||
      tb_firmware:
 | 
			
		||||
        - key: max.poll.records
 | 
			
		||||
          value: 10
 | 
			
		||||
    other:
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
 | 
			
		||||
@ -108,6 +108,10 @@ queue:
 | 
			
		||||
      sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
 | 
			
		||||
      sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
 | 
			
		||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
			
		||||
    consumerPropertiesPerTopic:
 | 
			
		||||
      tb_firmware:
 | 
			
		||||
        - key: max.poll.records
 | 
			
		||||
          value: 10
 | 
			
		||||
    other:
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
 | 
			
		||||
@ -174,6 +174,10 @@ queue:
 | 
			
		||||
      sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
 | 
			
		||||
      sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
 | 
			
		||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
			
		||||
    consumerPropertiesPerTopic:
 | 
			
		||||
      tb_firmware:
 | 
			
		||||
        - key: max.poll.records
 | 
			
		||||
          value: 10
 | 
			
		||||
    other:
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
 | 
			
		||||
@ -136,6 +136,10 @@ queue:
 | 
			
		||||
      sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
 | 
			
		||||
      sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
 | 
			
		||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
			
		||||
    consumerPropertiesPerTopic:
 | 
			
		||||
      tb_firmware:
 | 
			
		||||
        - key: max.poll.records
 | 
			
		||||
          value: 10
 | 
			
		||||
    other:
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user