Merge remote-tracking branch 'origin/master' into develop/3.3-edge

This commit is contained in:
Volodymyr Babak 2021-01-17 01:01:04 +02:00
commit 20a6719453
14 changed files with 54 additions and 51 deletions

View File

@ -691,7 +691,7 @@ queue:
linger.ms: "${TB_KAFKA_LINGER_MS:1}"
buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}"
max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:0}"
max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:300000}"
max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}"
fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
@ -791,8 +791,6 @@ queue:
max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:10000}"
# JS response poll interval
response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}"
# JS response auto commit interval
response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
rule-engine:
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"

View File

@ -42,7 +42,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
private final short replicationFactor;
public TbKafkaAdmin(TbKafkaSettings settings, Map<String, String> topicConfigs) {
client = AdminClient.create(settings.toProps());
client = AdminClient.create(settings.toAdminProps());
this.topicConfigs = topicConfigs;
try {

View File

@ -45,24 +45,14 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
@Builder
private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
String clientId, String groupId, String topic,
boolean autoCommit, int autoCommitIntervalMs,
TbQueueAdmin admin) {
super(topic);
Properties props = settings.toProps();
Properties props = settings.toConsumerProps();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
if (groupId != null) {
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
if (settings.getMaxPollIntervalMs() > 0) {
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, settings.getMaxPollIntervalMs());
}
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, settings.getMaxPollRecords());
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, settings.getMaxPartitionFetchBytes());
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, settings.getFetchMaxBytes());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
this.admin = admin;
this.consumer = new KafkaConsumer<>(props);
this.decoder = decoder;

View File

@ -55,9 +55,8 @@ public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
@Builder
private TbKafkaProducerTemplate(TbKafkaSettings settings, String defaultTopic, String clientId, TbQueueAdmin admin) {
Properties props = settings.toProps();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
Properties props = settings.toProducerProps();
if (!StringUtils.isEmpty(clientId)) {
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
}

View File

@ -19,9 +19,11 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@ -32,7 +34,7 @@ import java.util.Properties;
* Created by ashvayka on 25.09.18.
*/
@Slf4j
@ConditionalOnExpression("'${queue.type:null}'=='kafka'")
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
@ConfigurationProperties(prefix = "queue.kafka")
@Component
public class TbKafkaSettings {
@ -60,19 +62,15 @@ public class TbKafkaSettings {
private short replicationFactor;
@Value("${queue.kafka.max_poll_records:8192}")
@Getter
private int maxPollRecords;
@Value("${queue.kafka.max_poll_interval_ms:0}")
@Getter
@Value("${queue.kafka.max_poll_interval_ms:300000}")
private int maxPollIntervalMs;
@Value("${queue.kafka.max_partition_fetch_bytes:16777216}")
@Getter
private int maxPartitionFetchBytes;
@Value("${queue.kafka.fetch_max_bytes:134217728}")
@Getter
private int fetchMaxBytes;
@Value("${queue.kafka.use_confluent_cloud:false}")
@ -93,21 +91,48 @@ public class TbKafkaSettings {
@Setter
private List<TbKafkaProperty> other;
public Properties toProps() {
Properties props = new Properties();
public Properties toAdminProps() {
Properties props = toProps();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(AdminClientConfig.RETRIES_CONFIG, retries);
return props;
}
public Properties toConsumerProps() {
Properties props = toProps();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return props;
}
public Properties toProducerProps() {
Properties props = toProps();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
return props;
}
private Properties toProps() {
Properties props = new Properties();
if (useConfluent) {
props.put("ssl.endpoint.identification.algorithm", sslAlgorithm);
props.put("sasl.mechanism", saslMechanism);
props.put("sasl.jaas.config", saslConfig);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
} else {
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
}
if (other != null) {

View File

@ -17,7 +17,7 @@ package org.thingsboard.server.queue.kafka;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.Map;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='kafka'")
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
public class TbKafkaTopicConfigs {
@Value("${queue.kafka.topic-properties.core}")
private String coreProperties;

View File

@ -34,9 +34,6 @@ public class TbQueueRemoteJsInvokeSettings {
@Value("${queue.js.response_poll_interval}")
private int responsePollInterval;
@Value("${queue.js.response_auto_commit_interval}")
private int autoCommitInterval;
@Value("${queue.js.max_requests_timeout}")
private long maxRequestsTimeout;
}

View File

@ -19,7 +19,7 @@ version: '2.2'
services:
kafka:
restart: always
image: "wurstmeister/kafka:2.12-2.3.0"
image: "wurstmeister/kafka:2.13-2.6.0"
ports:
- "9092:9092"
env_file:

View File

@ -19,7 +19,7 @@
"azure-sb": "^0.11.1",
"config": "^3.3.1",
"js-yaml": "^3.14.0",
"kafkajs": "^1.14.0",
"kafkajs": "^1.15.0",
"long": "^4.0.0",
"uuid-parse": "^1.1.0",
"uuid-random": "^1.3.2",

View File

@ -1665,10 +1665,10 @@ jws@^4.0.0:
jwa "^2.0.0"
safe-buffer "^5.0.1"
kafkajs@^1.14.0:
version "1.14.0"
resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-1.14.0.tgz#3d998a77bfde54dc502e8e88690eedf0b21a1ed6"
integrity sha512-W+WCekiooY5rJP3Me5N3gWcQ8O6uG6lw0vv9t+sI+WqXKjKwj2+CWIXJy241x+ITE+1M1D19ABSiL2J8lKja5A==
kafkajs@^1.15.0:
version "1.15.0"
resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-1.15.0.tgz#a5ada0d933edca2149177393562be6fb0875ec3a"
integrity sha512-yjPyEnQCkPxAuQLIJnY5dI+xnmmgXmhuOQ1GVxClG5KTOV/rJcW1qA3UfvyEJKTp/RTSqQnUR3HJsKFvHyTpNg==
keyv@^3.0.0:
version "3.1.0"

View File

@ -94,7 +94,7 @@
</sonar.exclusions>
<elasticsearch.version>5.0.2</elasticsearch.version>
<delight-nashorn-sandbox.version>0.1.14</delight-nashorn-sandbox.version>
<kafka.version>2.3.0</kafka.version>
<kafka.version>2.6.0</kafka.version>
<bucket4j.version>4.1.1</bucket4j.version>
<fst.version>2.57</fst.version>
<antlr.version>2.7.7</antlr.version>

View File

@ -160,8 +160,6 @@ queue:
max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:10000}"
# JS response poll interval
response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}"
# JS response auto commit interval
response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
rule-engine:
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"

View File

@ -153,8 +153,6 @@ queue:
max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:10000}"
# JS response poll interval
response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}"
# JS response auto commit interval
response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
rule-engine:
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"

View File

@ -182,8 +182,6 @@ queue:
max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:10000}"
# JS response poll interval
response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}"
# JS response auto commit interval
response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
rule-engine:
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"