diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 14bf47301b..1bc8c19a82 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1018,6 +1018,8 @@ queue: max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}" max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}" fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" + request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms + session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" @@ -1036,11 +1038,12 @@ queue: # tb_rule_engine.sq: # - key: max.poll.records # value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}" - other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside - - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms - value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) - - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms - value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) + other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000" + other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside + # - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms + # value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) + # - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + # value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) topic-properties: rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusQueueConfigs.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusQueueConfigs.java index 6c56d55d36..5e404bc928 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusQueueConfigs.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusQueueConfigs.java @@ -19,10 +19,9 @@ import lombok.Getter; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; -import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.queue.util.PropertyUtils; import javax.annotation.PostConstruct; -import java.util.HashMap; import java.util.Map; @Component @@ -55,24 +54,12 @@ public class TbServiceBusQueueConfigs { @PostConstruct private void init() { - coreConfigs = getConfigs(coreProperties); - ruleEngineConfigs = getConfigs(ruleEngineProperties); - transportApiConfigs = getConfigs(transportApiProperties); - notificationsConfigs = getConfigs(notificationsProperties); - jsExecutorConfigs = getConfigs(jsExecutorProperties); - vcConfigs = getConfigs(vcProperties); + coreConfigs = PropertyUtils.getProps(coreProperties); + ruleEngineConfigs = PropertyUtils.getProps(ruleEngineProperties); + transportApiConfigs = PropertyUtils.getProps(transportApiProperties); + notificationsConfigs = PropertyUtils.getProps(notificationsProperties); + jsExecutorConfigs = PropertyUtils.getProps(jsExecutorProperties); + vcConfigs = PropertyUtils.getProps(vcProperties); } - private Map getConfigs(String properties) { - Map configs = new HashMap<>(); - if (StringUtils.isNotEmpty(properties)) { - for (String property : properties.split(";")) { - int delimiterPosition = property.indexOf(":"); - String key = property.substring(0, delimiterPosition); - String value = property.substring(delimiterPosition + 1); - configs.put(key, value); - } - } - return configs; - } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index 5c63fe6e7d..7ed188491b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -32,6 +32,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.TbProperty; +import org.thingsboard.server.queue.util.PropertyUtils; import java.util.Collections; import java.util.List; @@ -53,34 +54,34 @@ public class TbKafkaSettings { @Value("${queue.kafka.ssl.enabled:false}") private boolean sslEnabled; - @Value("${queue.kafka.ssl.truststore.location}") + @Value("${queue.kafka.ssl.truststore.location:}") private String sslTruststoreLocation; - @Value("${queue.kafka.ssl.truststore.password}") + @Value("${queue.kafka.ssl.truststore.password:}") private String sslTruststorePassword; - @Value("${queue.kafka.ssl.keystore.location}") + @Value("${queue.kafka.ssl.keystore.location:}") private String sslKeystoreLocation; - @Value("${queue.kafka.ssl.keystore.password}") + @Value("${queue.kafka.ssl.keystore.password:}") private String sslKeystorePassword; - @Value("${queue.kafka.ssl.key.password}") + @Value("${queue.kafka.ssl.key.password:}") private String sslKeyPassword; - @Value("${queue.kafka.acks}") + @Value("${queue.kafka.acks:all}") private String acks; - @Value("${queue.kafka.retries}") + @Value("${queue.kafka.retries:1}") private int retries; @Value("${queue.kafka.compression.type:none}") private String compressionType; - @Value("${queue.kafka.batch.size}") + @Value("${queue.kafka.batch.size:16384}") private int batchSize; - @Value("${queue.kafka.linger.ms}") + @Value("${queue.kafka.linger.ms:1}") private long lingerMs; @Value("${queue.kafka.max.request.size:1048576}") @@ -89,10 +90,10 @@ public class TbKafkaSettings { @Value("${queue.kafka.max.in.flight.requests.per.connection:5}") private int maxInFlightRequestsPerConnection; - @Value("${queue.kafka.buffer.memory}") + @Value("${queue.kafka.buffer.memory:33554432}") private long bufferMemory; - @Value("${queue.kafka.replication_factor}") + @Value("${queue.kafka.replication_factor:1}") @Getter private short replicationFactor; @@ -108,21 +109,31 @@ public class TbKafkaSettings { @Value("${queue.kafka.fetch_max_bytes:134217728}") private int fetchMaxBytes; + @Value("${queue.kafka.request.timeout.ms:30000}") + private int requestTimeoutMs; + + @Value("${queue.kafka.session.timeout.ms:10000}") + private int sessionTimeoutMs; + @Value("${queue.kafka.use_confluent_cloud:false}") private boolean useConfluent; - @Value("${queue.kafka.confluent.ssl.algorithm}") + @Value("${queue.kafka.confluent.ssl.algorithm:}") private String sslAlgorithm; - @Value("${queue.kafka.confluent.sasl.mechanism}") + @Value("${queue.kafka.confluent.sasl.mechanism:}") private String saslMechanism; - @Value("${queue.kafka.confluent.sasl.config}") + @Value("${queue.kafka.confluent.sasl.config:}") private String saslConfig; - @Value("${queue.kafka.confluent.security.protocol}") + @Value("${queue.kafka.confluent.security.protocol:}") private String securityProtocol; + @Value("${queue.kafka.other-inline:}") + private String otherInline; + + @Deprecated @Setter private List other; @@ -134,8 +145,6 @@ public class TbKafkaSettings { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(AdminClientConfig.RETRIES_CONFIG, retries); - configureSSL(props); - return props; } @@ -147,8 +156,6 @@ public class TbKafkaSettings { props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs); - configureSSL(props); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); @@ -174,7 +181,7 @@ public class TbKafkaSettings { return props; } - private Properties toProps() { + Properties toProps() { Properties props = new Properties(); if (useConfluent) { @@ -184,6 +191,11 @@ public class TbKafkaSettings { props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); } + props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); + props.put(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs); + + props.putAll(PropertyUtils.getProps(otherInline)); + if (other != null) { other.forEach(kv -> props.put(kv.getKey(), kv.getValue())); } @@ -193,7 +205,7 @@ public class TbKafkaSettings { return props; } - private void configureSSL(Properties props) { + void configureSSL(Properties props) { if (sslEnabled) { props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java index 64dd6f2720..9888066ee7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java @@ -19,10 +19,9 @@ import lombok.Getter; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; -import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.queue.util.PropertyUtils; import javax.annotation.PostConstruct; -import java.util.HashMap; import java.util.Map; @Component @@ -66,29 +65,17 @@ public class TbKafkaTopicConfigs { @PostConstruct private void init() { - coreConfigs = getConfigs(coreProperties); - ruleEngineConfigs = getConfigs(ruleEngineProperties); - transportApiRequestConfigs = getConfigs(transportApiProperties); - transportApiResponseConfigs = getConfigs(transportApiProperties); + coreConfigs = PropertyUtils.getProps(coreProperties); + ruleEngineConfigs = PropertyUtils.getProps(ruleEngineProperties); + transportApiRequestConfigs = PropertyUtils.getProps(transportApiProperties); + transportApiResponseConfigs = PropertyUtils.getProps(transportApiProperties); transportApiResponseConfigs.put(NUM_PARTITIONS_SETTING, "1"); - notificationsConfigs = getConfigs(notificationsProperties); - jsExecutorRequestConfigs = getConfigs(jsExecutorProperties); - jsExecutorResponseConfigs = getConfigs(jsExecutorProperties); + notificationsConfigs = PropertyUtils.getProps(notificationsProperties); + jsExecutorRequestConfigs = PropertyUtils.getProps(jsExecutorProperties); + jsExecutorResponseConfigs = PropertyUtils.getProps(jsExecutorProperties); jsExecutorResponseConfigs.put(NUM_PARTITIONS_SETTING, "1"); - fwUpdatesConfigs = getConfigs(fwUpdatesProperties); - vcConfigs = getConfigs(vcProperties); + fwUpdatesConfigs = PropertyUtils.getProps(fwUpdatesProperties); + vcConfigs = PropertyUtils.getProps(vcProperties); } - private Map getConfigs(String properties) { - Map configs = new HashMap<>(); - if (StringUtils.isNotEmpty(properties)) { - for (String property : properties.split(";")) { - int delimiterPosition = property.indexOf(":"); - String key = property.substring(0, delimiterPosition); - String value = property.substring(delimiterPosition + 1); - configs.put(key, value); - } - } - return configs; - } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java index 0a65c7ca7d..3d7ab85ecc 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java @@ -19,10 +19,9 @@ import lombok.Getter; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; -import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.queue.util.PropertyUtils; import javax.annotation.PostConstruct; -import java.util.HashMap; import java.util.Map; @Component @@ -56,24 +55,12 @@ public class TbPubSubSubscriptionSettings { @PostConstruct private void init() { - coreSettings = getSettings(coreProperties); - ruleEngineSettings = getSettings(ruleEngineProperties); - transportApiSettings = getSettings(transportApiProperties); - notificationsSettings = getSettings(notificationsProperties); - jsExecutorSettings = getSettings(jsExecutorProperties); - vcSettings = getSettings(vcProperties); + coreSettings = PropertyUtils.getProps(coreProperties); + ruleEngineSettings = PropertyUtils.getProps(ruleEngineProperties); + transportApiSettings = PropertyUtils.getProps(transportApiProperties); + notificationsSettings = PropertyUtils.getProps(notificationsProperties); + jsExecutorSettings = PropertyUtils.getProps(jsExecutorProperties); + vcSettings = PropertyUtils.getProps(vcProperties); } - private Map getSettings(String properties) { - Map configs = new HashMap<>(); - if (StringUtils.isNotEmpty(properties)) { - for (String property : properties.split(";")) { - int delimiterPosition = property.indexOf(":"); - String key = property.substring(0, delimiterPosition); - String value = property.substring(delimiterPosition + 1); - configs.put(key, value); - } - } - return configs; - } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/PropertyUtils.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/PropertyUtils.java new file mode 100644 index 0000000000..089d7f2219 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/PropertyUtils.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.util; + +import org.thingsboard.server.common.data.StringUtils; + +import java.util.HashMap; +import java.util.Map; + +public class PropertyUtils { + + public static Map getProps(String properties) { + Map configs = new HashMap<>(); + if (StringUtils.isNotEmpty(properties)) { + for (String property : properties.split(";")) { + if (StringUtils.isNotEmpty(property)) { + int delimiterPosition = property.indexOf(":"); + String key = property.substring(0, delimiterPosition); + String value = property.substring(delimiterPosition + 1); + configs.put(key, value); + } + } + } + return configs; + } + +} diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java new file mode 100644 index 0000000000..7e8c2d319d --- /dev/null +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java @@ -0,0 +1,83 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.kafka; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.TestPropertySource; + +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; + +@SpringBootTest(classes = TbKafkaSettings.class) +@TestPropertySource(properties = { + "queue.type=kafka", + "queue.kafka.bootstrap.servers=localhost:9092", + "queue.kafka.other-inline=metrics.recording.level:INFO;metrics.sample.window.ms:30000", +}) +class TbKafkaSettingsTest { + + @Autowired + TbKafkaSettings settings; + + @BeforeEach + void beforeEach() { + settings = spy(settings); //SpyBean is not aware on @ConditionalOnProperty, that is why the traditional spy in use + } + + @Test + void givenToProps_whenConfigureSSL_thenVerifyOnce() { + Properties props = settings.toProps(); + + assertThat(props).as("TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS").containsEntry("request.timeout.ms", 30000); + assertThat(props).as("TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS").containsEntry("session.timeout.ms", 10000); + + //other-inline + assertThat(props).as("metrics.recording.level").containsEntry("metrics.recording.level", "INFO"); + assertThat(props).as("TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS").containsEntry("metrics.sample.window.ms", "30000"); + + Mockito.verify(settings).toProps(); + Mockito.verify(settings).configureSSL(any()); + } + + @Test + void givenToAdminProps_whenConfigureSSL_thenVerifyOnce() { + settings.toAdminProps(); + Mockito.verify(settings).toProps(); + Mockito.verify(settings).configureSSL(any()); + } + + @Test + void givenToConsumerProps_whenConfigureSSL_thenVerifyOnce() { + settings.toConsumerProps("main"); + Mockito.verify(settings).toProps(); + Mockito.verify(settings).configureSSL(any()); + } + + @Test + void givenTotoProducerProps_whenConfigureSSL_thenVerifyOnce() { + settings.toProducerProps(); + Mockito.verify(settings).toProps(); + Mockito.verify(settings).configureSSL(any()); + } + +} \ No newline at end of file diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/util/PropertyUtilsTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/util/PropertyUtilsTest.java new file mode 100644 index 0000000000..0484cb314f --- /dev/null +++ b/common/queue/src/test/java/org/thingsboard/server/queue/util/PropertyUtilsTest.java @@ -0,0 +1,62 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.util; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +class PropertyUtilsTest { + + @Test + void givenNullOrEmpty_whenGetConfig_thenEmptyMap() { + assertThat(PropertyUtils.getProps(null)).as("null property").isEmpty(); + assertThat(PropertyUtils.getProps("")).as("empty property").isEmpty(); + assertThat(PropertyUtils.getProps(";")).as("ends with ;").isEmpty(); + } + + @Test + void givenKafkaOtherProperties_whenGetConfig_thenReturnMappedValues() { + assertThat(PropertyUtils.getProps("metrics.recording.level:INFO;metrics.sample.window.ms:30000")) + .as("two pairs") + .isEqualTo(Map.of( + "metrics.recording.level", "INFO", + "metrics.sample.window.ms", "30000" + )); + + assertThat(PropertyUtils.getProps("metrics.recording.level:INFO;metrics.sample.window.ms:30000" + ";")) + .as("two pairs ends with ;") + .isEqualTo(Map.of( + "metrics.recording.level", "INFO", + "metrics.sample.window.ms", "30000" + )); + } + + @Test + void givenKafkaTopicProperties_whenGetConfig_thenReturnMappedValues() { + assertThat(PropertyUtils.getProps("retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1")) + .isEqualTo(Map.of( + "retention.ms", "604800000", + "segment.bytes", "26214400", + "retention.bytes", "1048576000", + "partitions", "1", + "min.insync.replicas", "1" + )); + } + +} diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 2f73ab31f7..352f94e091 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -70,6 +70,8 @@ queue: max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}" max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}" fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" + request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms + session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" @@ -88,11 +90,12 @@ queue: # tb_rule_engine.sq: # - key: max.poll.records # value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}" - other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside - - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms - value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) - - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms - value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) + other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000" + other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside + # - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms + # value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) + # - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + # value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) topic-properties: core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 1bf71fee13..7ea553fe5c 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -167,17 +167,24 @@ queue: max.in.flight.requests.per.connection: "${TB_KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:5}" buffer.memory: "${TB_BUFFER_MEMORY:33554432}" replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" + max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:300000}" + max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}" + max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}" + fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" + request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms + session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" - other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside - - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms - value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) - - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms - value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) + other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000" + other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside + # - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms + # value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) + # - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + # value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) topic-properties: rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index caeb1bfc2a..346ec48eae 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -152,17 +152,24 @@ queue: max.in.flight.requests.per.connection: "${TB_KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:5}" buffer.memory: "${TB_BUFFER_MEMORY:33554432}" replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" + max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:300000}" + max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}" + max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}" + fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" + request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms + session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" - other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside - - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms - value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) - - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms - value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) + other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000" + other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside + # - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms + # value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) + # - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + # value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) topic-properties: rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index f930836d16..4e8167d89d 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -233,17 +233,24 @@ queue: max.in.flight.requests.per.connection: "${TB_KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:5}" buffer.memory: "${TB_BUFFER_MEMORY:33554432}" replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" + max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:300000}" + max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}" + max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}" + fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" + request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms + session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" - other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside - - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms - value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) - - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms - value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) + other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000" + other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside + # - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms + # value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) + # - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + # value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) topic-properties: rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index ec60357764..1e0b1ebcd4 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -182,17 +182,24 @@ queue: max.in.flight.requests.per.connection: "${TB_KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:5}" buffer.memory: "${TB_BUFFER_MEMORY:33554432}" replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" + max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:300000}" + max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}" + max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}" + fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" + request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms + session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" - other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside - - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms - value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) - - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms - value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) + other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000" + other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside + # - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms + # value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) + # - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + # value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) topic-properties: rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 169e464e57..9f086bcbc5 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -128,17 +128,24 @@ queue: max.in.flight.requests.per.connection: "${TB_KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:5}" buffer.memory: "${TB_BUFFER_MEMORY:33554432}" replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" + max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:300000}" + max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}" + max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}" + fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" + request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms + session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" - other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside - - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms - value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) - - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms - value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) + other-inline: "${TB_QUEUE_KAFKA_OTHER_PROPERTIES:}" # In this section you can specify custom parameters (semicolon separated) for Kafka consumer/producer/admin # Example "metrics.recording.level:INFO;metrics.sample.window.ms:30000" + other: # DEPRECATED. In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside + # - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms + # value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) + # - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + # value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) topic-properties: rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"