diff --git a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java index ddfaf69d3a..18433c6f53 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java @@ -247,4 +247,4 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe log.info("Failed to load PostgreSQL upgrade functions due to: {}", e.getMessage()); } } -} \ No newline at end of file +} diff --git a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java index 260db325e5..756356581a 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/TimescaleTsDatabaseUpgradeService.java @@ -209,4 +209,4 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr log.info("Failed to load PostgreSQL upgrade functions due to: {}", e.getMessage()); } } -} \ No newline at end of file +} diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java index e81788958d..41f90c642c 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java @@ -39,8 +39,7 @@ public abstract class AbstractCleanUpService { protected String dbPassword; protected long executeQuery(Connection conn, String query) throws SQLException { - try (Statement statement = conn.createStatement()) { - ResultSet resultSet = statement.executeQuery(query); + try (Statement statement = conn.createStatement(); ResultSet resultSet = statement.executeQuery(query)) { if (log.isDebugEnabled()) { getWarnings(statement); } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java index 8a1b17c549..40febd1988 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java @@ -33,4 +33,4 @@ public class TimescaleTimeseriesCleanUpService extends AbstractTimeseriesCleanUp long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID + "'," + systemTtl + ", 0);"); log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved); } -} \ No newline at end of file +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 448d2f4e9d..92893a9c5f 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -646,11 +646,11 @@ queue: security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" other: topic-properties: - rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}" aws_sqs: use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index b92b094af1..bee293bd6d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -37,6 +37,7 @@ public class TbKafkaAdmin implements TbQueueAdmin { private final AdminClient client; private final Map topicConfigs; private final Set topics = ConcurrentHashMap.newKeySet(); + private final int numPartitions; private final short replicationFactor; @@ -50,6 +51,13 @@ public class TbKafkaAdmin implements TbQueueAdmin { log.error("Failed to get all topics.", e); } + String numPartitionsStr = topicConfigs.get("partitions"); + if (numPartitionsStr != null) { + numPartitions = Integer.parseInt(numPartitionsStr); + topicConfigs.remove("partitions"); + } else { + numPartitions = 1; + } replicationFactor = settings.getReplicationFactor(); } @@ -59,7 +67,7 @@ public class TbKafkaAdmin implements TbQueueAdmin { return; } try { - NewTopic newTopic = new NewTopic(topic, 1, replicationFactor).configs(topicConfigs); + NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(topicConfigs); createTopic(newTopic).values().get(topic).get(); topics.add(topic); } catch (ExecutionException ee) { diff --git a/docker/queue-kafka.env b/docker/queue-kafka.env index 63107942fb..0207d64ef5 100644 --- a/docker/queue-kafka.env +++ b/docker/queue-kafka.env @@ -1,2 +1,3 @@ TB_QUEUE_TYPE=kafka TB_KAFKA_SERVERS=kafka:9092 +TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100 diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index fac1ac7e8a..e6ec6f5b36 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -25,7 +25,7 @@ kafka: # Kafka Bootstrap Servers servers: "localhost:9092" replication_factor: "1" - topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600" + topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100" use_confluent_cloud: false confluent: sasl: diff --git a/msa/js-executor/queue/kafkaTemplate.js b/msa/js-executor/queue/kafkaTemplate.js index 0420173188..2672a8df6a 100644 --- a/msa/js-executor/queue/kafkaTemplate.js +++ b/msa/js-executor/queue/kafkaTemplate.js @@ -34,7 +34,7 @@ function KafkaProducer() { this.send = async (responseTopic, scriptId, rawResponse, headers) => { if (!topics.includes(responseTopic)) { - let createResponseTopicResult = await createTopic(responseTopic); + let createResponseTopicResult = await createTopic(responseTopic, 1); topics.push(responseTopic); if (createResponseTopicResult) { logger.info('Created new topic: %s', requestTopic); @@ -88,7 +88,18 @@ function KafkaProducer() { kafkaAdmin = kafkaClient.admin(); await kafkaAdmin.connect(); - let createRequestTopicResult = await createTopic(requestTopic); + let partitions = 1; + + for (let i = 0; i < configEntries.length; i++) { + let param = configEntries[i]; + if (param.name === 'partitions') { + partitions = param.value; + configEntries.splice(i, 1); + break; + } + } + + let createRequestTopicResult = await createTopic(requestTopic, partitions); if (createRequestTopicResult) { logger.info('Created new topic: %s', requestTopic); @@ -121,10 +132,11 @@ function KafkaProducer() { } })(); -function createTopic(topic) { +function createTopic(topic, partitions) { return kafkaAdmin.createTopics({ topics: [{ topic: topic, + numPartitions: partitions, replicationFactor: replicationFactor, configEntries: configEntries }] diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index f04a7559b8..3f57b32fab 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -77,11 +77,11 @@ queue: security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" other: topic-properties: - rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}" aws_sqs: use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index a97a58cd85..77d5f30fa7 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -70,11 +70,11 @@ queue: security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" other: topic-properties: - rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}" aws_sqs: use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index b3804e3531..bc18112c8e 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -98,11 +98,11 @@ queue: security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" other: topic-properties: - rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" - js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}" aws_sqs: use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"