Merge pull request #8384 from smatvienko-tb/feature/kafka_properties_refactor
[3.5] Kafka other properties inline
This commit is contained in:
		
						commit
						08375e613b
					
				@ -1018,6 +1018,8 @@ queue:
 | 
			
		||||
    max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
 | 
			
		||||
    max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}"
 | 
			
		||||
    fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
 | 
			
		||||
    request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
 | 
			
		||||
    confluent:
 | 
			
		||||
      ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
 | 
			
		||||
@ -1036,11 +1038,12 @@ queue:
 | 
			
		||||
    #      tb_rule_engine.sq:
 | 
			
		||||
    #        - key: max.poll.records
 | 
			
		||||
    #          value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}"
 | 
			
		||||
    other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
 | 
			
		||||
      - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
        value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
 | 
			
		||||
      - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
        value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
 | 
			
		||||
    other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
 | 
			
		||||
    #  - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
 | 
			
		||||
    #  - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
 | 
			
		||||
@ -19,10 +19,9 @@ import lombok.Getter;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.queue.util.PropertyUtils;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PostConstruct;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@ -55,24 +54,12 @@ public class TbServiceBusQueueConfigs {
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    private void init() {
 | 
			
		||||
        coreConfigs = getConfigs(coreProperties);
 | 
			
		||||
        ruleEngineConfigs = getConfigs(ruleEngineProperties);
 | 
			
		||||
        transportApiConfigs = getConfigs(transportApiProperties);
 | 
			
		||||
        notificationsConfigs = getConfigs(notificationsProperties);
 | 
			
		||||
        jsExecutorConfigs = getConfigs(jsExecutorProperties);
 | 
			
		||||
        vcConfigs = getConfigs(vcProperties);
 | 
			
		||||
        coreConfigs = PropertyUtils.getProps(coreProperties);
 | 
			
		||||
        ruleEngineConfigs = PropertyUtils.getProps(ruleEngineProperties);
 | 
			
		||||
        transportApiConfigs = PropertyUtils.getProps(transportApiProperties);
 | 
			
		||||
        notificationsConfigs = PropertyUtils.getProps(notificationsProperties);
 | 
			
		||||
        jsExecutorConfigs = PropertyUtils.getProps(jsExecutorProperties);
 | 
			
		||||
        vcConfigs = PropertyUtils.getProps(vcProperties);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Map<String, String> getConfigs(String properties) {
 | 
			
		||||
        Map<String, String> configs = new HashMap<>();
 | 
			
		||||
        if (StringUtils.isNotEmpty(properties)) {
 | 
			
		||||
            for (String property : properties.split(";")) {
 | 
			
		||||
                int delimiterPosition = property.indexOf(":");
 | 
			
		||||
                String key = property.substring(0, delimiterPosition);
 | 
			
		||||
                String value = property.substring(delimiterPosition + 1);
 | 
			
		||||
                configs.put(key, value);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return configs;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -32,6 +32,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
			
		||||
import org.springframework.boot.context.properties.ConfigurationProperties;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.data.TbProperty;
 | 
			
		||||
import org.thingsboard.server.queue.util.PropertyUtils;
 | 
			
		||||
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
@ -53,34 +54,34 @@ public class TbKafkaSettings {
 | 
			
		||||
    @Value("${queue.kafka.ssl.enabled:false}")
 | 
			
		||||
    private boolean sslEnabled;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.ssl.truststore.location}")
 | 
			
		||||
    @Value("${queue.kafka.ssl.truststore.location:}")
 | 
			
		||||
    private String sslTruststoreLocation;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.ssl.truststore.password}")
 | 
			
		||||
    @Value("${queue.kafka.ssl.truststore.password:}")
 | 
			
		||||
    private String sslTruststorePassword;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.ssl.keystore.location}")
 | 
			
		||||
    @Value("${queue.kafka.ssl.keystore.location:}")
 | 
			
		||||
    private String sslKeystoreLocation;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.ssl.keystore.password}")
 | 
			
		||||
    @Value("${queue.kafka.ssl.keystore.password:}")
 | 
			
		||||
    private String sslKeystorePassword;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.ssl.key.password}")
 | 
			
		||||
    @Value("${queue.kafka.ssl.key.password:}")
 | 
			
		||||
    private String sslKeyPassword;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.acks}")
 | 
			
		||||
    @Value("${queue.kafka.acks:all}")
 | 
			
		||||
    private String acks;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.retries}")
 | 
			
		||||
    @Value("${queue.kafka.retries:1}")
 | 
			
		||||
    private int retries;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.compression.type:none}")
 | 
			
		||||
    private String compressionType;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.batch.size}")
 | 
			
		||||
    @Value("${queue.kafka.batch.size:16384}")
 | 
			
		||||
    private int batchSize;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.linger.ms}")
 | 
			
		||||
    @Value("${queue.kafka.linger.ms:1}")
 | 
			
		||||
    private long lingerMs;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.max.request.size:1048576}")
 | 
			
		||||
@ -89,10 +90,10 @@ public class TbKafkaSettings {
 | 
			
		||||
    @Value("${queue.kafka.max.in.flight.requests.per.connection:5}")
 | 
			
		||||
    private int maxInFlightRequestsPerConnection;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.buffer.memory}")
 | 
			
		||||
    @Value("${queue.kafka.buffer.memory:33554432}")
 | 
			
		||||
    private long bufferMemory;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.replication_factor}")
 | 
			
		||||
    @Value("${queue.kafka.replication_factor:1}")
 | 
			
		||||
    @Getter
 | 
			
		||||
    private short replicationFactor;
 | 
			
		||||
 | 
			
		||||
@ -108,21 +109,31 @@ public class TbKafkaSettings {
 | 
			
		||||
    @Value("${queue.kafka.fetch_max_bytes:134217728}")
 | 
			
		||||
    private int fetchMaxBytes;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.request.timeout.ms:30000}")
 | 
			
		||||
    private int requestTimeoutMs;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.session.timeout.ms:10000}")
 | 
			
		||||
    private int sessionTimeoutMs;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.use_confluent_cloud:false}")
 | 
			
		||||
    private boolean useConfluent;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.confluent.ssl.algorithm}")
 | 
			
		||||
    @Value("${queue.kafka.confluent.ssl.algorithm:}")
 | 
			
		||||
    private String sslAlgorithm;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.confluent.sasl.mechanism}")
 | 
			
		||||
    @Value("${queue.kafka.confluent.sasl.mechanism:}")
 | 
			
		||||
    private String saslMechanism;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.confluent.sasl.config}")
 | 
			
		||||
    @Value("${queue.kafka.confluent.sasl.config:}")
 | 
			
		||||
    private String saslConfig;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.confluent.security.protocol}")
 | 
			
		||||
    @Value("${queue.kafka.confluent.security.protocol:}")
 | 
			
		||||
    private String securityProtocol;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.kafka.other-inline:}")
 | 
			
		||||
    private String otherInline;
 | 
			
		||||
 | 
			
		||||
    @Deprecated
 | 
			
		||||
    @Setter
 | 
			
		||||
    private List<TbProperty> other;
 | 
			
		||||
 | 
			
		||||
@ -134,8 +145,6 @@ public class TbKafkaSettings {
 | 
			
		||||
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
 | 
			
		||||
        props.put(AdminClientConfig.RETRIES_CONFIG, retries);
 | 
			
		||||
 | 
			
		||||
        configureSSL(props);
 | 
			
		||||
 | 
			
		||||
        return props;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -147,8 +156,6 @@ public class TbKafkaSettings {
 | 
			
		||||
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
 | 
			
		||||
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
 | 
			
		||||
 | 
			
		||||
        configureSSL(props);
 | 
			
		||||
 | 
			
		||||
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 | 
			
		||||
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
 | 
			
		||||
 | 
			
		||||
@ -174,7 +181,7 @@ public class TbKafkaSettings {
 | 
			
		||||
        return props;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Properties toProps() {
 | 
			
		||||
    Properties toProps() {
 | 
			
		||||
        Properties props = new Properties();
 | 
			
		||||
 | 
			
		||||
        if (useConfluent) {
 | 
			
		||||
@ -184,6 +191,11 @@ public class TbKafkaSettings {
 | 
			
		||||
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
 | 
			
		||||
        props.put(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
 | 
			
		||||
 | 
			
		||||
        props.putAll(PropertyUtils.getProps(otherInline));
 | 
			
		||||
 | 
			
		||||
        if (other != null) {
 | 
			
		||||
            other.forEach(kv -> props.put(kv.getKey(), kv.getValue()));
 | 
			
		||||
        }
 | 
			
		||||
@ -193,7 +205,7 @@ public class TbKafkaSettings {
 | 
			
		||||
        return props;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void configureSSL(Properties props) {
 | 
			
		||||
    void configureSSL(Properties props) {
 | 
			
		||||
        if (sslEnabled) {
 | 
			
		||||
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
 | 
			
		||||
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
 | 
			
		||||
 | 
			
		||||
@ -19,10 +19,9 @@ import lombok.Getter;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.queue.util.PropertyUtils;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PostConstruct;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@ -66,29 +65,17 @@ public class TbKafkaTopicConfigs {
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    private void init() {
 | 
			
		||||
        coreConfigs = getConfigs(coreProperties);
 | 
			
		||||
        ruleEngineConfigs = getConfigs(ruleEngineProperties);
 | 
			
		||||
        transportApiRequestConfigs = getConfigs(transportApiProperties);
 | 
			
		||||
        transportApiResponseConfigs = getConfigs(transportApiProperties);
 | 
			
		||||
        coreConfigs = PropertyUtils.getProps(coreProperties);
 | 
			
		||||
        ruleEngineConfigs = PropertyUtils.getProps(ruleEngineProperties);
 | 
			
		||||
        transportApiRequestConfigs = PropertyUtils.getProps(transportApiProperties);
 | 
			
		||||
        transportApiResponseConfigs = PropertyUtils.getProps(transportApiProperties);
 | 
			
		||||
        transportApiResponseConfigs.put(NUM_PARTITIONS_SETTING, "1");
 | 
			
		||||
        notificationsConfigs = getConfigs(notificationsProperties);
 | 
			
		||||
        jsExecutorRequestConfigs = getConfigs(jsExecutorProperties);
 | 
			
		||||
        jsExecutorResponseConfigs = getConfigs(jsExecutorProperties);
 | 
			
		||||
        notificationsConfigs = PropertyUtils.getProps(notificationsProperties);
 | 
			
		||||
        jsExecutorRequestConfigs = PropertyUtils.getProps(jsExecutorProperties);
 | 
			
		||||
        jsExecutorResponseConfigs = PropertyUtils.getProps(jsExecutorProperties);
 | 
			
		||||
        jsExecutorResponseConfigs.put(NUM_PARTITIONS_SETTING, "1");
 | 
			
		||||
        fwUpdatesConfigs = getConfigs(fwUpdatesProperties);
 | 
			
		||||
        vcConfigs = getConfigs(vcProperties);
 | 
			
		||||
        fwUpdatesConfigs = PropertyUtils.getProps(fwUpdatesProperties);
 | 
			
		||||
        vcConfigs = PropertyUtils.getProps(vcProperties);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Map<String, String> getConfigs(String properties) {
 | 
			
		||||
        Map<String, String> configs = new HashMap<>();
 | 
			
		||||
        if (StringUtils.isNotEmpty(properties)) {
 | 
			
		||||
            for (String property : properties.split(";")) {
 | 
			
		||||
                int delimiterPosition = property.indexOf(":");
 | 
			
		||||
                String key = property.substring(0, delimiterPosition);
 | 
			
		||||
                String value = property.substring(delimiterPosition + 1);
 | 
			
		||||
                configs.put(key, value);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return configs;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -19,10 +19,9 @@ import lombok.Getter;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.queue.util.PropertyUtils;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PostConstruct;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@ -56,24 +55,12 @@ public class TbPubSubSubscriptionSettings {
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    private void init() {
 | 
			
		||||
        coreSettings = getSettings(coreProperties);
 | 
			
		||||
        ruleEngineSettings = getSettings(ruleEngineProperties);
 | 
			
		||||
        transportApiSettings = getSettings(transportApiProperties);
 | 
			
		||||
        notificationsSettings = getSettings(notificationsProperties);
 | 
			
		||||
        jsExecutorSettings = getSettings(jsExecutorProperties);
 | 
			
		||||
        vcSettings = getSettings(vcProperties);
 | 
			
		||||
        coreSettings = PropertyUtils.getProps(coreProperties);
 | 
			
		||||
        ruleEngineSettings = PropertyUtils.getProps(ruleEngineProperties);
 | 
			
		||||
        transportApiSettings = PropertyUtils.getProps(transportApiProperties);
 | 
			
		||||
        notificationsSettings = PropertyUtils.getProps(notificationsProperties);
 | 
			
		||||
        jsExecutorSettings = PropertyUtils.getProps(jsExecutorProperties);
 | 
			
		||||
        vcSettings = PropertyUtils.getProps(vcProperties);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Map<String, String> getSettings(String properties) {
 | 
			
		||||
        Map<String, String> configs = new HashMap<>();
 | 
			
		||||
        if (StringUtils.isNotEmpty(properties)) {
 | 
			
		||||
            for (String property : properties.split(";")) {
 | 
			
		||||
                int delimiterPosition = property.indexOf(":");
 | 
			
		||||
                String key = property.substring(0, delimiterPosition);
 | 
			
		||||
                String value = property.substring(delimiterPosition + 1);
 | 
			
		||||
                configs.put(key, value);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return configs;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,40 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.util;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
 | 
			
		||||
public class PropertyUtils {
 | 
			
		||||
 | 
			
		||||
    public static Map<String, String> getProps(String properties) {
 | 
			
		||||
        Map<String, String> configs = new HashMap<>();
 | 
			
		||||
        if (StringUtils.isNotEmpty(properties)) {
 | 
			
		||||
            for (String property : properties.split(";")) {
 | 
			
		||||
                if (StringUtils.isNotEmpty(property)) {
 | 
			
		||||
                    int delimiterPosition = property.indexOf(":");
 | 
			
		||||
                    String key = property.substring(0, delimiterPosition);
 | 
			
		||||
                    String value = property.substring(delimiterPosition + 1);
 | 
			
		||||
                    configs.put(key, value);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        return configs;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,83 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.kafka;
 | 
			
		||||
 | 
			
		||||
import org.junit.jupiter.api.BeforeEach;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
import org.mockito.Mockito;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.boot.test.context.SpringBootTest;
 | 
			
		||||
import org.springframework.test.context.TestPropertySource;
 | 
			
		||||
 | 
			
		||||
import java.util.Properties;
 | 
			
		||||
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.Mockito.spy;
 | 
			
		||||
 | 
			
		||||
@SpringBootTest(classes = TbKafkaSettings.class)
 | 
			
		||||
@TestPropertySource(properties = {
 | 
			
		||||
        "queue.type=kafka",
 | 
			
		||||
        "queue.kafka.bootstrap.servers=localhost:9092",
 | 
			
		||||
        "queue.kafka.other-inline=metrics.recording.level:INFO;metrics.sample.window.ms:30000",
 | 
			
		||||
})
 | 
			
		||||
class TbKafkaSettingsTest {
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    TbKafkaSettings settings;
 | 
			
		||||
 | 
			
		||||
    @BeforeEach
 | 
			
		||||
    void beforeEach() {
 | 
			
		||||
        settings = spy(settings); //SpyBean is not aware on @ConditionalOnProperty, that is why the traditional spy in use
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenToProps_whenConfigureSSL_thenVerifyOnce() {
 | 
			
		||||
        Properties props = settings.toProps();
 | 
			
		||||
 | 
			
		||||
        assertThat(props).as("TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS").containsEntry("request.timeout.ms", 30000);
 | 
			
		||||
        assertThat(props).as("TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS").containsEntry("session.timeout.ms", 10000);
 | 
			
		||||
 | 
			
		||||
        //other-inline
 | 
			
		||||
        assertThat(props).as("metrics.recording.level").containsEntry("metrics.recording.level", "INFO");
 | 
			
		||||
        assertThat(props).as("TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS").containsEntry("metrics.sample.window.ms", "30000");
 | 
			
		||||
 | 
			
		||||
        Mockito.verify(settings).toProps();
 | 
			
		||||
        Mockito.verify(settings).configureSSL(any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenToAdminProps_whenConfigureSSL_thenVerifyOnce() {
 | 
			
		||||
        settings.toAdminProps();
 | 
			
		||||
        Mockito.verify(settings).toProps();
 | 
			
		||||
        Mockito.verify(settings).configureSSL(any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenToConsumerProps_whenConfigureSSL_thenVerifyOnce() {
 | 
			
		||||
        settings.toConsumerProps("main");
 | 
			
		||||
        Mockito.verify(settings).toProps();
 | 
			
		||||
        Mockito.verify(settings).configureSSL(any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenTotoProducerProps_whenConfigureSSL_thenVerifyOnce() {
 | 
			
		||||
        settings.toProducerProps();
 | 
			
		||||
        Mockito.verify(settings).toProps();
 | 
			
		||||
        Mockito.verify(settings).configureSSL(any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,62 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.util;
 | 
			
		||||
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
 | 
			
		||||
class PropertyUtilsTest {
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenNullOrEmpty_whenGetConfig_thenEmptyMap() {
 | 
			
		||||
        assertThat(PropertyUtils.getProps(null)).as("null property").isEmpty();
 | 
			
		||||
        assertThat(PropertyUtils.getProps("")).as("empty property").isEmpty();
 | 
			
		||||
        assertThat(PropertyUtils.getProps(";")).as("ends with ;").isEmpty();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenKafkaOtherProperties_whenGetConfig_thenReturnMappedValues() {
 | 
			
		||||
        assertThat(PropertyUtils.getProps("metrics.recording.level:INFO;metrics.sample.window.ms:30000"))
 | 
			
		||||
                .as("two pairs")
 | 
			
		||||
                .isEqualTo(Map.of(
 | 
			
		||||
                        "metrics.recording.level", "INFO",
 | 
			
		||||
                        "metrics.sample.window.ms", "30000"
 | 
			
		||||
                ));
 | 
			
		||||
 | 
			
		||||
        assertThat(PropertyUtils.getProps("metrics.recording.level:INFO;metrics.sample.window.ms:30000" + ";"))
 | 
			
		||||
                .as("two pairs ends with ;")
 | 
			
		||||
                .isEqualTo(Map.of(
 | 
			
		||||
                        "metrics.recording.level", "INFO",
 | 
			
		||||
                        "metrics.sample.window.ms", "30000"
 | 
			
		||||
                ));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    void givenKafkaTopicProperties_whenGetConfig_thenReturnMappedValues() {
 | 
			
		||||
        assertThat(PropertyUtils.getProps("retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1"))
 | 
			
		||||
                .isEqualTo(Map.of(
 | 
			
		||||
                        "retention.ms", "604800000",
 | 
			
		||||
                        "segment.bytes", "26214400",
 | 
			
		||||
                        "retention.bytes", "1048576000",
 | 
			
		||||
                        "partitions", "1",
 | 
			
		||||
                        "min.insync.replicas", "1"
 | 
			
		||||
                ));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -70,6 +70,8 @@ queue:
 | 
			
		||||
    max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
 | 
			
		||||
    max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}"
 | 
			
		||||
    fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
 | 
			
		||||
    request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
 | 
			
		||||
    confluent:
 | 
			
		||||
      ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
 | 
			
		||||
@ -88,11 +90,12 @@ queue:
 | 
			
		||||
    #      tb_rule_engine.sq:
 | 
			
		||||
    #        - key: max.poll.records
 | 
			
		||||
    #          value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}"
 | 
			
		||||
    other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
 | 
			
		||||
      - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
        value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
 | 
			
		||||
      - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
        value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
 | 
			
		||||
    other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
 | 
			
		||||
    #  - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
 | 
			
		||||
    #  - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
 | 
			
		||||
@ -167,17 +167,24 @@ queue:
 | 
			
		||||
    max.in.flight.requests.per.connection: "${TB_KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:5}"
 | 
			
		||||
    buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
 | 
			
		||||
    replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}"
 | 
			
		||||
    max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:300000}"
 | 
			
		||||
    max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
 | 
			
		||||
    max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}"
 | 
			
		||||
    fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
 | 
			
		||||
    request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
 | 
			
		||||
    confluent:
 | 
			
		||||
      ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
 | 
			
		||||
      sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
 | 
			
		||||
      sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
 | 
			
		||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
			
		||||
    other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
 | 
			
		||||
      - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
        value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
 | 
			
		||||
      - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
        value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
 | 
			
		||||
    other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
 | 
			
		||||
    #  - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
 | 
			
		||||
    #  - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
 | 
			
		||||
@ -152,17 +152,24 @@ queue:
 | 
			
		||||
    max.in.flight.requests.per.connection: "${TB_KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:5}"
 | 
			
		||||
    buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
 | 
			
		||||
    replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}"
 | 
			
		||||
    max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:300000}"
 | 
			
		||||
    max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
 | 
			
		||||
    max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}"
 | 
			
		||||
    fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
 | 
			
		||||
    request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
 | 
			
		||||
    confluent:
 | 
			
		||||
      ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
 | 
			
		||||
      sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
 | 
			
		||||
      sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
 | 
			
		||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
			
		||||
    other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
 | 
			
		||||
      - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
        value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
 | 
			
		||||
      - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
        value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
 | 
			
		||||
    other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
 | 
			
		||||
    #  - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
 | 
			
		||||
    #  - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
 | 
			
		||||
@ -233,17 +233,24 @@ queue:
 | 
			
		||||
    max.in.flight.requests.per.connection: "${TB_KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:5}"
 | 
			
		||||
    buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
 | 
			
		||||
    replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}"
 | 
			
		||||
    max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:300000}"
 | 
			
		||||
    max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
 | 
			
		||||
    max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}"
 | 
			
		||||
    fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
 | 
			
		||||
    request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
 | 
			
		||||
    confluent:
 | 
			
		||||
      ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
 | 
			
		||||
      sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
 | 
			
		||||
      sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
 | 
			
		||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
			
		||||
    other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
 | 
			
		||||
      - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
        value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
 | 
			
		||||
      - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
        value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
 | 
			
		||||
    other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
 | 
			
		||||
    #  - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
 | 
			
		||||
    #  - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
 | 
			
		||||
@ -182,17 +182,24 @@ queue:
 | 
			
		||||
    max.in.flight.requests.per.connection: "${TB_KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:5}"
 | 
			
		||||
    buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
 | 
			
		||||
    replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}"
 | 
			
		||||
    max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:300000}"
 | 
			
		||||
    max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
 | 
			
		||||
    max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}"
 | 
			
		||||
    fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
 | 
			
		||||
    request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
 | 
			
		||||
    confluent:
 | 
			
		||||
      ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
 | 
			
		||||
      sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
 | 
			
		||||
      sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
 | 
			
		||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
			
		||||
    other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
 | 
			
		||||
      - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
        value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
 | 
			
		||||
      - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
        value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
 | 
			
		||||
    other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
 | 
			
		||||
    #  - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
 | 
			
		||||
    #  - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
 | 
			
		||||
@ -128,17 +128,24 @@ queue:
 | 
			
		||||
    max.in.flight.requests.per.connection: "${TB_KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:5}"
 | 
			
		||||
    buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
 | 
			
		||||
    replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}"
 | 
			
		||||
    max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:300000}"
 | 
			
		||||
    max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
 | 
			
		||||
    max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}"
 | 
			
		||||
    fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
 | 
			
		||||
    request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
 | 
			
		||||
    confluent:
 | 
			
		||||
      ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
 | 
			
		||||
      sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
 | 
			
		||||
      sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
 | 
			
		||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
			
		||||
    other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
 | 
			
		||||
      - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
        value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
 | 
			
		||||
      - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
        value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000"
 | 
			
		||||
    other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
 | 
			
		||||
    #  - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
 | 
			
		||||
    #  - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
 | 
			
		||||
    #    value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user