Single Partition for Kafka response topics. 10 partitions for transport-api by default

This commit is contained in:
Andrii Shvaika 2022-07-13 14:40:44 +03:00
parent 438f90f3ec
commit 37533239b4
12 changed files with 94 additions and 55 deletions

View File

@ -943,7 +943,7 @@ queue:
topic-properties: topic-properties:
rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1}" js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1}"
ota-updates: "${TB_QUEUE_KAFKA_OTA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}" ota-updates: "${TB_QUEUE_KAFKA_OTA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"

View File

@ -51,7 +51,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
log.error("Failed to get all topics.", e); log.error("Failed to get all topics.", e);
} }
String numPartitionsStr = topicConfigs.get("partitions"); String numPartitionsStr = topicConfigs.get(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING);
if (numPartitionsStr != null) { if (numPartitionsStr != null) {
numPartitions = Integer.parseInt(numPartitionsStr); numPartitions = Integer.parseInt(numPartitionsStr);
topicConfigs.remove("partitions"); topicConfigs.remove("partitions");

View File

@ -28,6 +28,8 @@ import java.util.Map;
@Component @Component
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka") @ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
public class TbKafkaTopicConfigs { public class TbKafkaTopicConfigs {
public static final String NUM_PARTITIONS_SETTING = "partitions";
@Value("${queue.kafka.topic-properties.core:}") @Value("${queue.kafka.topic-properties.core:}")
private String coreProperties; private String coreProperties;
@Value("${queue.kafka.topic-properties.rule-engine:}") @Value("${queue.kafka.topic-properties.rule-engine:}")
@ -43,17 +45,20 @@ public class TbKafkaTopicConfigs {
@Value("${queue.kafka.topic-properties.version-control:}") @Value("${queue.kafka.topic-properties.version-control:}")
private String vcProperties; private String vcProperties;
@Getter @Getter
private Map<String, String> coreConfigs; private Map<String, String> coreConfigs;
@Getter @Getter
private Map<String, String> ruleEngineConfigs; private Map<String, String> ruleEngineConfigs;
@Getter @Getter
private Map<String, String> transportApiConfigs; private Map<String, String> transportApiRequestConfigs;
@Getter
private Map<String, String> transportApiResponseConfigs;
@Getter @Getter
private Map<String, String> notificationsConfigs; private Map<String, String> notificationsConfigs;
@Getter @Getter
private Map<String, String> jsExecutorConfigs; private Map<String, String> jsExecutorRequestConfigs;
@Getter
private Map<String, String> jsExecutorResponseConfigs;
@Getter @Getter
private Map<String, String> fwUpdatesConfigs; private Map<String, String> fwUpdatesConfigs;
@Getter @Getter
@ -63,9 +68,13 @@ public class TbKafkaTopicConfigs {
private void init() { private void init() {
coreConfigs = getConfigs(coreProperties); coreConfigs = getConfigs(coreProperties);
ruleEngineConfigs = getConfigs(ruleEngineProperties); ruleEngineConfigs = getConfigs(ruleEngineProperties);
transportApiConfigs = getConfigs(transportApiProperties); transportApiRequestConfigs = getConfigs(transportApiProperties);
transportApiResponseConfigs = getConfigs(transportApiProperties);
transportApiResponseConfigs.put(NUM_PARTITIONS_SETTING, "1");
notificationsConfigs = getConfigs(notificationsProperties); notificationsConfigs = getConfigs(notificationsProperties);
jsExecutorConfigs = getConfigs(jsExecutorProperties); jsExecutorRequestConfigs = getConfigs(jsExecutorProperties);
jsExecutorResponseConfigs = getConfigs(jsExecutorProperties);
jsExecutorResponseConfigs.put(NUM_PARTITIONS_SETTING, "1");
fwUpdatesConfigs = getConfigs(fwUpdatesProperties); fwUpdatesConfigs = getConfigs(fwUpdatesProperties);
vcConfigs = getConfigs(vcProperties); vcConfigs = getConfigs(vcProperties);
} }

View File

@ -75,8 +75,10 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TbQueueAdmin coreAdmin; private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin; private final TbQueueAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorAdmin; private final TbQueueAdmin jsExecutorRequestAdmin;
private final TbQueueAdmin transportApiAdmin; private final TbQueueAdmin jsExecutorResponseAdmin;
private final TbQueueAdmin transportApiRequestAdmin;
private final TbQueueAdmin transportApiResponseAdmin;
private final TbQueueAdmin notificationAdmin; private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin fwUpdatesAdmin; private final TbQueueAdmin fwUpdatesAdmin;
private final TbQueueAdmin vcAdmin; private final TbQueueAdmin vcAdmin;
@ -106,8 +108,10 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs()); this.jsExecutorRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorRequestConfigs());
this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs()); this.jsExecutorResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorResponseConfigs());
this.transportApiRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiRequestConfigs());
this.transportApiResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiResponseConfigs());
this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs()); this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
this.vcAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getVcConfigs()); this.vcAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getVcConfigs());
@ -237,7 +241,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
consumerBuilder.clientId("monolith-transport-api-consumer-" + serviceInfoProvider.getServiceId()); consumerBuilder.clientId("monolith-transport-api-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("monolith-transport-api-consumer"); consumerBuilder.groupId("monolith-transport-api-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(transportApiAdmin); consumerBuilder.admin(transportApiRequestAdmin);
consumerBuilder.statsService(consumerStatsService); consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build(); return consumerBuilder.build();
} }
@ -248,7 +252,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
requestBuilder.settings(kafkaSettings); requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-transport-api-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.clientId("monolith-transport-api-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic()); requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic());
requestBuilder.admin(transportApiAdmin); requestBuilder.admin(transportApiResponseAdmin);
return requestBuilder.build(); return requestBuilder.build();
} }
@ -259,7 +263,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
requestBuilder.settings(kafkaSettings); requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId()); requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic()); requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
requestBuilder.admin(jsExecutorAdmin); requestBuilder.admin(jsExecutorRequestAdmin);
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder(); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder();
responseBuilder.settings(kafkaSettings); responseBuilder.settings(kafkaSettings);
@ -273,11 +277,11 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
} }
); );
responseBuilder.statsService(consumerStatsService); responseBuilder.statsService(consumerStatsService);
responseBuilder.admin(jsExecutorAdmin); responseBuilder.admin(jsExecutorResponseAdmin);
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder(); <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
builder.queueAdmin(jsExecutorAdmin); builder.queueAdmin(jsExecutorResponseAdmin);
builder.requestTemplate(requestBuilder.build()); builder.requestTemplate(requestBuilder.build());
builder.responseTemplate(responseBuilder.build()); builder.responseTemplate(responseBuilder.build());
builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
@ -350,11 +354,17 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
if (ruleEngineAdmin != null) { if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy(); ruleEngineAdmin.destroy();
} }
if (jsExecutorAdmin != null) { if (jsExecutorRequestAdmin != null) {
jsExecutorAdmin.destroy(); jsExecutorRequestAdmin.destroy();
} }
if (transportApiAdmin != null) { if (jsExecutorResponseAdmin != null) {
transportApiAdmin.destroy(); jsExecutorResponseAdmin.destroy();
}
if (transportApiRequestAdmin != null) {
transportApiRequestAdmin.destroy();
}
if (transportApiResponseAdmin != null) {
transportApiResponseAdmin.destroy();
} }
if (notificationAdmin != null) { if (notificationAdmin != null) {
notificationAdmin.destroy(); notificationAdmin.destroy();

View File

@ -73,8 +73,10 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbQueueAdmin coreAdmin; private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin; private final TbQueueAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorAdmin; private final TbQueueAdmin jsExecutorRequestAdmin;
private final TbQueueAdmin transportApiAdmin; private final TbQueueAdmin jsExecutorResponseAdmin;
private final TbQueueAdmin transportApiRequestAdmin;
private final TbQueueAdmin transportApiResponseAdmin;
private final TbQueueAdmin notificationAdmin; private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin fwUpdatesAdmin; private final TbQueueAdmin fwUpdatesAdmin;
private final TbQueueAdmin vcAdmin; private final TbQueueAdmin vcAdmin;
@ -103,8 +105,10 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs()); this.jsExecutorRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorRequestConfigs());
this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs()); this.jsExecutorResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorResponseConfigs());
this.transportApiRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiRequestConfigs());
this.transportApiResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiResponseConfigs());
this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs()); this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
this.vcAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getVcConfigs()); this.vcAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getVcConfigs());
@ -194,7 +198,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
consumerBuilder.clientId("tb-core-transport-api-consumer-" + serviceInfoProvider.getServiceId()); consumerBuilder.clientId("tb-core-transport-api-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.groupId("tb-core-transport-api-consumer"); consumerBuilder.groupId("tb-core-transport-api-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(transportApiAdmin); consumerBuilder.admin(transportApiRequestAdmin);
consumerBuilder.statsService(consumerStatsService); consumerBuilder.statsService(consumerStatsService);
return consumerBuilder.build(); return consumerBuilder.build();
} }
@ -205,7 +209,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
requestBuilder.settings(kafkaSettings); requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-core-transport-api-producer-" + serviceInfoProvider.getServiceId()); requestBuilder.clientId("tb-core-transport-api-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic()); requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic());
requestBuilder.admin(transportApiAdmin); requestBuilder.admin(transportApiResponseAdmin);
return requestBuilder.build(); return requestBuilder.build();
} }
@ -216,7 +220,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
requestBuilder.settings(kafkaSettings); requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId()); requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic()); requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
requestBuilder.admin(jsExecutorAdmin); requestBuilder.admin(jsExecutorRequestAdmin);
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder(); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder();
responseBuilder.settings(kafkaSettings); responseBuilder.settings(kafkaSettings);
@ -229,12 +233,12 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
} }
); );
responseBuilder.admin(jsExecutorAdmin); responseBuilder.admin(jsExecutorResponseAdmin);
responseBuilder.statsService(consumerStatsService); responseBuilder.statsService(consumerStatsService);
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder(); <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
builder.queueAdmin(jsExecutorAdmin); builder.queueAdmin(jsExecutorResponseAdmin);
builder.requestTemplate(requestBuilder.build()); builder.requestTemplate(requestBuilder.build());
builder.responseTemplate(responseBuilder.build()); builder.responseTemplate(responseBuilder.build());
builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
@ -307,11 +311,17 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
if (ruleEngineAdmin != null) { if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy(); ruleEngineAdmin.destroy();
} }
if (jsExecutorAdmin != null) { if (jsExecutorRequestAdmin != null) {
jsExecutorAdmin.destroy(); jsExecutorRequestAdmin.destroy();
} }
if (transportApiAdmin != null) { if (jsExecutorResponseAdmin != null) {
transportApiAdmin.destroy(); jsExecutorResponseAdmin.destroy();
}
if (transportApiRequestAdmin != null) {
transportApiRequestAdmin.destroy();
}
if (transportApiResponseAdmin != null) {
transportApiResponseAdmin.destroy();
} }
if (notificationAdmin != null) { if (notificationAdmin != null) {
notificationAdmin.destroy(); notificationAdmin.destroy();

View File

@ -68,7 +68,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TbQueueAdmin coreAdmin; private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin; private final TbQueueAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorAdmin; private final TbQueueAdmin jsExecutorRequestAdmin;
private final TbQueueAdmin jsExecutorResponseAdmin;
private final TbQueueAdmin notificationAdmin; private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin fwUpdatesAdmin; private final TbQueueAdmin fwUpdatesAdmin;
private final AtomicLong consumerCount = new AtomicLong(); private final AtomicLong consumerCount = new AtomicLong();
@ -92,7 +93,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs()); this.jsExecutorRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorRequestConfigs());
this.jsExecutorResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorResponseConfigs());
this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs()); this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
} }
@ -191,7 +193,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
requestBuilder.settings(kafkaSettings); requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId()); requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic()); requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
requestBuilder.admin(jsExecutorAdmin); requestBuilder.admin(jsExecutorRequestAdmin);
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder(); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder();
responseBuilder.settings(kafkaSettings); responseBuilder.settings(kafkaSettings);
@ -204,12 +206,12 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders()); return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
} }
); );
responseBuilder.admin(jsExecutorAdmin); responseBuilder.admin(jsExecutorResponseAdmin);
responseBuilder.statsService(consumerStatsService); responseBuilder.statsService(consumerStatsService);
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder(); <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
builder.queueAdmin(jsExecutorAdmin); builder.queueAdmin(jsExecutorResponseAdmin);
builder.requestTemplate(requestBuilder.build()); builder.requestTemplate(requestBuilder.build());
builder.responseTemplate(responseBuilder.build()); builder.responseTemplate(responseBuilder.build());
builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests()); builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
@ -236,8 +238,11 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
if (ruleEngineAdmin != null) { if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy(); ruleEngineAdmin.destroy();
} }
if (jsExecutorAdmin != null) { if (jsExecutorRequestAdmin != null) {
jsExecutorAdmin.destroy(); jsExecutorRequestAdmin.destroy();
}
if (jsExecutorResponseAdmin != null) {
jsExecutorResponseAdmin.destroy();
} }
if (notificationAdmin != null) { if (notificationAdmin != null) {
notificationAdmin.destroy(); notificationAdmin.destroy();

View File

@ -59,7 +59,8 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
private final TbQueueAdmin coreAdmin; private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin; private final TbQueueAdmin ruleEngineAdmin;
private final TbQueueAdmin transportApiAdmin; private final TbQueueAdmin transportApiRequestAdmin;
private final TbQueueAdmin transportApiResponseAdmin;
private final TbQueueAdmin notificationAdmin; private final TbQueueAdmin notificationAdmin;
public KafkaTbTransportQueueFactory(TbKafkaSettings kafkaSettings, public KafkaTbTransportQueueFactory(TbKafkaSettings kafkaSettings,
@ -80,7 +81,8 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs()); this.transportApiRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiRequestConfigs());
this.transportApiResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiResponseConfigs());
this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
} }
@ -90,7 +92,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
requestBuilder.settings(kafkaSettings); requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("transport-api-request-" + serviceInfoProvider.getServiceId()); requestBuilder.clientId("transport-api-request-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(transportApiSettings.getRequestsTopic()); requestBuilder.defaultTopic(transportApiSettings.getRequestsTopic());
requestBuilder.admin(transportApiAdmin); requestBuilder.admin(transportApiRequestAdmin);
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> responseBuilder = TbKafkaConsumerTemplate.builder(); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> responseBuilder = TbKafkaConsumerTemplate.builder();
responseBuilder.settings(kafkaSettings); responseBuilder.settings(kafkaSettings);
@ -98,12 +100,12 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
responseBuilder.clientId("transport-api-response-" + serviceInfoProvider.getServiceId()); responseBuilder.clientId("transport-api-response-" + serviceInfoProvider.getServiceId());
responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId()); responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
responseBuilder.admin(transportApiAdmin); responseBuilder.admin(transportApiResponseAdmin);
responseBuilder.statsService(consumerStatsService); responseBuilder.statsService(consumerStatsService);
DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder(); <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
templateBuilder.queueAdmin(transportApiAdmin); templateBuilder.queueAdmin(transportApiResponseAdmin);
templateBuilder.requestTemplate(requestBuilder.build()); templateBuilder.requestTemplate(requestBuilder.build());
templateBuilder.responseTemplate(responseBuilder.build()); templateBuilder.responseTemplate(responseBuilder.build());
templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests());
@ -163,8 +165,11 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
if (ruleEngineAdmin != null) { if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy(); ruleEngineAdmin.destroy();
} }
if (transportApiAdmin != null) { if (transportApiRequestAdmin != null) {
transportApiAdmin.destroy(); transportApiRequestAdmin.destroy();
}
if (transportApiResponseAdmin != null) {
transportApiResponseAdmin.destroy();
} }
if (notificationAdmin != null) { if (notificationAdmin != null) {
notificationAdmin.destroy(); notificationAdmin.destroy();

View File

@ -172,7 +172,7 @@ queue:
topic-properties: topic-properties:
rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
aws_sqs: aws_sqs:
use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"

View File

@ -159,7 +159,7 @@ queue:
topic-properties: topic-properties:
rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
aws_sqs: aws_sqs:
use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"

View File

@ -235,10 +235,10 @@ queue:
- key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms - key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
topic-properties: topic-properties:
rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}" notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
aws_sqs: aws_sqs:
use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"

View File

@ -189,7 +189,7 @@ queue:
topic-properties: topic-properties:
rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
aws_sqs: aws_sqs:
use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"

View File

@ -135,7 +135,7 @@ queue:
topic-properties: topic-properties:
rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
consumer-stats: consumer-stats:
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"