Merge pull request #3414 from YevhenBondarenko/develop/2.5.5-kafka-partitions
added partition property for kafka
This commit is contained in:
		
						commit
						3f9f6efc26
					
				@ -617,11 +617,11 @@ queue:
 | 
			
		||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
			
		||||
    other:
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}"
 | 
			
		||||
  aws_sqs:
 | 
			
		||||
    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
			
		||||
    access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
 | 
			
		||||
 | 
			
		||||
@ -37,6 +37,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
 | 
			
		||||
    private final AdminClient client;
 | 
			
		||||
    private final Map<String, String> topicConfigs;
 | 
			
		||||
    private final Set<String> topics = ConcurrentHashMap.newKeySet();
 | 
			
		||||
    private final int numPartitions;
 | 
			
		||||
 | 
			
		||||
    private final short replicationFactor;
 | 
			
		||||
 | 
			
		||||
@ -50,6 +51,13 @@ public class TbKafkaAdmin implements TbQueueAdmin {
 | 
			
		||||
            log.error("Failed to get all topics.", e);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        String numPartitionsStr = topicConfigs.get("partitions");
 | 
			
		||||
        if (numPartitionsStr != null) {
 | 
			
		||||
            numPartitions = Integer.parseInt(numPartitionsStr);
 | 
			
		||||
            topicConfigs.remove("partitions");
 | 
			
		||||
        } else {
 | 
			
		||||
            numPartitions = 1;
 | 
			
		||||
        }
 | 
			
		||||
        replicationFactor = settings.getReplicationFactor();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -59,7 +67,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
        try {
 | 
			
		||||
            NewTopic newTopic = new NewTopic(topic, 1, replicationFactor).configs(topicConfigs);
 | 
			
		||||
            NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(topicConfigs);
 | 
			
		||||
            createTopic(newTopic).values().get(topic).get();
 | 
			
		||||
            topics.add(topic);
 | 
			
		||||
        } catch (ExecutionException ee) {
 | 
			
		||||
 | 
			
		||||
@ -1,2 +1,3 @@
 | 
			
		||||
TB_QUEUE_TYPE=kafka
 | 
			
		||||
TB_KAFKA_SERVERS=kafka:9092
 | 
			
		||||
TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100
 | 
			
		||||
 | 
			
		||||
@ -25,7 +25,7 @@ kafka:
 | 
			
		||||
    # Kafka Bootstrap Servers
 | 
			
		||||
    servers: "localhost:9092"
 | 
			
		||||
  replication_factor: "1"
 | 
			
		||||
  topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600"
 | 
			
		||||
  topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100"
 | 
			
		||||
  use_confluent_cloud: false
 | 
			
		||||
  confluent:
 | 
			
		||||
    sasl:
 | 
			
		||||
 | 
			
		||||
@ -34,7 +34,7 @@ function KafkaProducer() {
 | 
			
		||||
    this.send = async (responseTopic, scriptId, rawResponse, headers) => {
 | 
			
		||||
 | 
			
		||||
        if (!topics.includes(responseTopic)) {
 | 
			
		||||
            let createResponseTopicResult = await createTopic(responseTopic);
 | 
			
		||||
            let createResponseTopicResult = await createTopic(responseTopic, 1);
 | 
			
		||||
            topics.push(responseTopic);
 | 
			
		||||
            if (createResponseTopicResult) {
 | 
			
		||||
                logger.info('Created new topic: %s', requestTopic);
 | 
			
		||||
@ -88,7 +88,18 @@ function KafkaProducer() {
 | 
			
		||||
        kafkaAdmin = kafkaClient.admin();
 | 
			
		||||
        await kafkaAdmin.connect();
 | 
			
		||||
 | 
			
		||||
        let createRequestTopicResult = await createTopic(requestTopic);
 | 
			
		||||
        let partitions = 1;
 | 
			
		||||
 | 
			
		||||
        for (let i = 0; i < configEntries.length; i++) {
 | 
			
		||||
            let param = configEntries[i];
 | 
			
		||||
            if (param.name === 'partitions') {
 | 
			
		||||
                partitions = param.value;
 | 
			
		||||
                configEntries.splice(i, 1);
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let createRequestTopicResult = await createTopic(requestTopic, partitions);
 | 
			
		||||
 | 
			
		||||
        if (createRequestTopicResult) {
 | 
			
		||||
            logger.info('Created new topic: %s', requestTopic);
 | 
			
		||||
@ -121,10 +132,11 @@ function KafkaProducer() {
 | 
			
		||||
    }
 | 
			
		||||
})();
 | 
			
		||||
 | 
			
		||||
function createTopic(topic) {
 | 
			
		||||
function createTopic(topic, partitions) {
 | 
			
		||||
    return kafkaAdmin.createTopics({
 | 
			
		||||
        topics: [{
 | 
			
		||||
            topic: topic,
 | 
			
		||||
            numPartitions: partitions,
 | 
			
		||||
            replicationFactor: replicationFactor,
 | 
			
		||||
            configEntries: configEntries
 | 
			
		||||
        }]
 | 
			
		||||
 | 
			
		||||
@ -77,11 +77,11 @@ queue:
 | 
			
		||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
			
		||||
    other:
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}"
 | 
			
		||||
  aws_sqs:
 | 
			
		||||
    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
			
		||||
    access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
 | 
			
		||||
 | 
			
		||||
@ -70,11 +70,11 @@ queue:
 | 
			
		||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
			
		||||
    other:
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}"
 | 
			
		||||
  aws_sqs:
 | 
			
		||||
    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
			
		||||
    access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
 | 
			
		||||
 | 
			
		||||
@ -98,11 +98,11 @@ queue:
 | 
			
		||||
      security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
 | 
			
		||||
    other:
 | 
			
		||||
    topic-properties:
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
 | 
			
		||||
      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
 | 
			
		||||
      rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
 | 
			
		||||
      js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}"
 | 
			
		||||
  aws_sqs:
 | 
			
		||||
    use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
 | 
			
		||||
    access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user