fixed passed topic name for consumer constructor in rule engine queue

This commit is contained in:
Dima Landiak 2021-10-27 11:26:46 +03:00
parent 330ff09437
commit 0745ee3d0a
12 changed files with 17 additions and 12 deletions

View File

@ -866,10 +866,15 @@ queue:
sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
# Key-value properties for Kafka consumer per specific topic, e.g. tb_ota_package is a topic name for ota, tb_rule_engine.sq is a topic name for default SequentialByOriginator queue.
# Check TB_QUEUE_CORE_OTA_TOPIC and TB_QUEUE_RE_SQ_TOPIC params
consumer-properties-per-topic:
tb_ota_package:
- key: max.poll.records
value: 10
value: "${TB_QUEUE_KAFKA_OTA_MAX_POLL_RECORDS:10}"
# tb_rule_engine.sq:
# - key: max.poll.records
# value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}"
other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
- key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)

View File

@ -117,7 +117,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic(),
return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, configuration.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

View File

@ -109,7 +109,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic(),
return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, configuration.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

View File

@ -92,7 +92,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
return new InMemoryTbQueueConsumer<>(ruleEngineSettings.getTopic());
return new InMemoryTbQueueConsumer<>(configuration.getTopic());
}
@Override

View File

@ -160,7 +160,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
String queueName = configuration.getName();
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(ruleEngineSettings.getTopic());
consumerBuilder.topic(configuration.getTopic());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId("re-" + queueName + "-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));

View File

@ -159,7 +159,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
String queueName = configuration.getName();
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(ruleEngineSettings.getTopic());
consumerBuilder.topic(configuration.getTopic());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId("re-" + queueName + "-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));

View File

@ -127,7 +127,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic(),
return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, configuration.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

View File

@ -114,7 +114,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic(),
return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, configuration.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

View File

@ -125,7 +125,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic(),
return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, configuration.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

View File

@ -112,7 +112,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic(),
return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, configuration.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

View File

@ -124,7 +124,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic(),
return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, configuration.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

View File

@ -112,7 +112,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic(),
return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, configuration.getTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
}