From f16672cbf69717f8506937ec20a87dee4d7052f3 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 2 Sep 2020 18:12:08 +0300 Subject: [PATCH] added partition property for kafka --- application/src/main/resources/thingsboard.yml | 10 +++++----- .../server/queue/kafka/TbKafkaAdmin.java | 10 +++++++++- docker/queue-kafka.env | 1 + msa/js-executor/config/default.yml | 2 +- msa/js-executor/queue/kafkaTemplate.js | 18 +++++++++++++++--- .../src/main/resources/tb-coap-transport.yml | 10 +++++----- .../src/main/resources/tb-http-transport.yml | 10 +++++----- .../src/main/resources/tb-mqtt-transport.yml | 10 +++++----- 8 files changed, 46 insertions(+), 25 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index ac76b67cd3..6400a6bbf0 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -617,11 +617,11 @@ queue: security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" other: topic-properties: - rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}" aws_sqs: use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index b92b094af1..bee293bd6d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -37,6 +37,7 @@ public class TbKafkaAdmin implements TbQueueAdmin { private final AdminClient client; private final Map topicConfigs; private final Set topics = ConcurrentHashMap.newKeySet(); + private final int numPartitions; private final short replicationFactor; @@ -50,6 +51,13 @@ public class TbKafkaAdmin implements TbQueueAdmin { log.error("Failed to get all topics.", e); } + String numPartitionsStr = topicConfigs.get("partitions"); + if (numPartitionsStr != null) { + numPartitions = Integer.parseInt(numPartitionsStr); + topicConfigs.remove("partitions"); + } else { + numPartitions = 1; + } replicationFactor = settings.getReplicationFactor(); } @@ -59,7 +67,7 @@ public class TbKafkaAdmin implements TbQueueAdmin { return; } try { - NewTopic newTopic = new NewTopic(topic, 1, replicationFactor).configs(topicConfigs); + NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(topicConfigs); createTopic(newTopic).values().get(topic).get(); topics.add(topic); } catch (ExecutionException ee) { diff --git a/docker/queue-kafka.env b/docker/queue-kafka.env index 63107942fb..0207d64ef5 100644 --- a/docker/queue-kafka.env +++ b/docker/queue-kafka.env @@ -1,2 +1,3 @@ TB_QUEUE_TYPE=kafka TB_KAFKA_SERVERS=kafka:9092 +TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100 diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index fac1ac7e8a..e6ec6f5b36 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -25,7 +25,7 @@ kafka: # Kafka Bootstrap Servers servers: "localhost:9092" replication_factor: "1" - topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600" + topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100" use_confluent_cloud: false confluent: sasl: diff --git a/msa/js-executor/queue/kafkaTemplate.js b/msa/js-executor/queue/kafkaTemplate.js index 0420173188..2672a8df6a 100644 --- a/msa/js-executor/queue/kafkaTemplate.js +++ b/msa/js-executor/queue/kafkaTemplate.js @@ -34,7 +34,7 @@ function KafkaProducer() { this.send = async (responseTopic, scriptId, rawResponse, headers) => { if (!topics.includes(responseTopic)) { - let createResponseTopicResult = await createTopic(responseTopic); + let createResponseTopicResult = await createTopic(responseTopic, 1); topics.push(responseTopic); if (createResponseTopicResult) { logger.info('Created new topic: %s', requestTopic); @@ -88,7 +88,18 @@ function KafkaProducer() { kafkaAdmin = kafkaClient.admin(); await kafkaAdmin.connect(); - let createRequestTopicResult = await createTopic(requestTopic); + let partitions = 1; + + for (let i = 0; i < configEntries.length; i++) { + let param = configEntries[i]; + if (param.name === 'partitions') { + partitions = param.value; + configEntries.splice(i, 1); + break; + } + } + + let createRequestTopicResult = await createTopic(requestTopic, partitions); if (createRequestTopicResult) { logger.info('Created new topic: %s', requestTopic); @@ -121,10 +132,11 @@ function KafkaProducer() { } })(); -function createTopic(topic) { +function createTopic(topic, partitions) { return kafkaAdmin.createTopics({ topics: [{ topic: topic, + numPartitions: partitions, replicationFactor: replicationFactor, configEntries: configEntries }] diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index f04a7559b8..3f57b32fab 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -77,11 +77,11 @@ queue: security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" other: topic-properties: - rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}" aws_sqs: use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index a97a58cd85..77d5f30fa7 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -70,11 +70,11 @@ queue: security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" other: topic-properties: - rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}" aws_sqs: use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index b3804e3531..bc18112c8e 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -98,11 +98,11 @@ queue: security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" other: topic-properties: - rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}" aws_sqs: use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"