Merge pull request #12588 from AndriiLandiak/kafka-configurable-topics

Kafka: make more topics configurable
This commit is contained in:
Viacheslav Klimov 2025-03-03 11:37:52 +02:00 committed by GitHub
commit 09ce342596
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 69 additions and 19 deletions

View File

@ -1659,6 +1659,8 @@ queue:
core:
# Default topic name
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}"
# Interval in milliseconds to poll messages by Core microservices
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
# Amount of partitions used by Core microservices
@ -1734,6 +1736,8 @@ queue:
rule-engine:
# Deprecated. It will be removed in the nearest releases
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}"
# Interval in milliseconds to poll messages by Rule Engine
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
# Timeout for processing a message pack of Rule Engine
@ -1754,6 +1758,8 @@ queue:
event_topic: "${TB_QUEUE_CF_EVENT_TOPIC:tb_cf_event}"
# Topic name for Calculated Field (CF) compacted states
state_topic: "${TB_QUEUE_CF_STATE_TOPIC:tb_cf_state}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CF_NOTIFICATIONS_TOPIC:calculated_field.notifications}"
# Interval in milliseconds to poll messages by CF (Rule Engine) microservices
poll_interval: "${TB_QUEUE_CF_POLL_INTERVAL_MS:25}"
# Amount of partitions used by CF microservices
@ -1772,6 +1778,10 @@ queue:
edge:
# Default topic name
topic: "${TB_QUEUE_EDGE_TOPIC:tb_edge}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_EDGE_NOTIFICATIONS_TOPIC:tb_edge.notifications}"
# For edge events messages
event_notifications_topic: "${TB_QUEUE_EDGE_EVENT_NOTIFICATIONS_TOPIC:tb_edge_event.notifications}"
# Amount of partitions used by Edge services
partitions: "${TB_QUEUE_EDGE_PARTITIONS:10}"
# Poll interval for topics related to Edge services

View File

@ -32,6 +32,24 @@ public class TopicService {
@Value("${queue.prefix:}")
private String prefix;
@Value("${queue.core.notifications-topic:tb_core.notifications}")
private String tbCoreNotificationsTopic;
@Value("${queue.rule-engine.notifications-topic:tb_rule_engine.notifications}")
private String tbRuleEngineNotificationsTopic;
@Value("${queue.transport.notifications-topics:tb_transport.notifications}")
private String tbTransportNotificationsTopic;
@Value("${queue.edge.notifications-topic:tb_edge.notifications}")
private String tbEdgeNotificationsTopic;
@Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}")
private String tbEdgeEventNotificationsTopic;
@Value("${queue.calculated_fields.notifications-topic:calculated_field.notifications}")
private String tbCalculatedFieldNotificationsTopic;
private final ConcurrentMap<String, TopicPartitionInfo> tbCoreNotificationTopics = new ConcurrentHashMap<>();
private final ConcurrentMap<String, TopicPartitionInfo> tbRuleEngineNotificationTopics = new ConcurrentHashMap<>();
private final ConcurrentMap<String, TopicPartitionInfo> tbEdgeNotificationTopics = new ConcurrentHashMap<>();
@ -48,24 +66,32 @@ public class TopicService {
public TopicPartitionInfo getNotificationsTopic(ServiceType serviceType, String serviceId) {
return switch (serviceType) {
case TB_CORE -> tbCoreNotificationTopics.computeIfAbsent(serviceId,
id -> buildNotificationsTopicPartitionInfo(serviceType, serviceId));
id -> buildNotificationsTopicPartitionInfo(tbCoreNotificationsTopic, serviceId));
case TB_RULE_ENGINE -> tbRuleEngineNotificationTopics.computeIfAbsent(serviceId,
id -> buildNotificationsTopicPartitionInfo(serviceType, serviceId));
default -> buildNotificationsTopicPartitionInfo(serviceType, serviceId);
id -> buildNotificationsTopicPartitionInfo(tbRuleEngineNotificationsTopic, serviceId));
case TB_TRANSPORT -> buildNotificationsTopicPartitionInfo(tbTransportNotificationsTopic, serviceId);
default -> throw new IllegalStateException("Unexpected service type: " + serviceType);
};
}
private TopicPartitionInfo buildNotificationsTopicPartitionInfo(String topic, String serviceId) {
return buildTopicPartitionInfo(buildNotificationTopicName(topic, serviceId), null, null, false);
}
public TopicPartitionInfo buildTopicPartitionInfo(String topic, TenantId tenantId, Integer partition, boolean myPartition) {
return new TopicPartitionInfo(buildTopicName(topic), tenantId, partition, myPartition);
}
public TopicPartitionInfo getEdgeNotificationsTopic(String serviceId) {
return tbEdgeNotificationTopics.computeIfAbsent(serviceId, id -> buildEdgeNotificationsTopicPartitionInfo(serviceId));
}
private TopicPartitionInfo buildEdgeNotificationsTopicPartitionInfo(String serviceId) {
return buildTopicPartitionInfo("tb_edge.notifications." + serviceId, null, null, false);
return buildTopicPartitionInfo(buildNotificationTopicName(tbEdgeNotificationsTopic, serviceId), null, null, false);
}
public TopicPartitionInfo getCalculatedFieldNotificationsTopic(String serviceId) {
return tbCalculatedFieldNotificationTopics.computeIfAbsent(serviceId,
id -> buildNotificationsTopicPartitionInfo("calculated_field", serviceId));
return tbCalculatedFieldNotificationTopics.computeIfAbsent(serviceId, id -> buildNotificationsTopicPartitionInfo(tbCalculatedFieldNotificationsTopic, serviceId));
}
public TopicPartitionInfo getEdgeEventNotificationsTopic(TenantId tenantId, EdgeId edgeId) {
@ -73,25 +99,17 @@ public class TopicService {
}
public TopicPartitionInfo buildEdgeEventNotificationsTopicPartitionInfo(TenantId tenantId, EdgeId edgeId) {
return buildTopicPartitionInfo("tb_edge_event.notifications." + tenantId + "." + edgeId, null, null, false);
}
private TopicPartitionInfo buildNotificationsTopicPartitionInfo(ServiceType serviceType, String serviceId) {
return buildNotificationsTopicPartitionInfo(serviceType.name().toLowerCase(), serviceId);
}
private TopicPartitionInfo buildNotificationsTopicPartitionInfo(String serviceType, String serviceId) {
return buildTopicPartitionInfo(serviceType + ".notifications." + serviceId, null, null, false);
}
public TopicPartitionInfo buildTopicPartitionInfo(String topic, TenantId tenantId, Integer partition, boolean myPartition) {
return new TopicPartitionInfo(buildTopicName(topic), tenantId, partition, myPartition);
return buildTopicPartitionInfo(tbEdgeEventNotificationsTopic + "." + tenantId + "." + edgeId, null, null, false);
}
public String buildTopicName(String topic) {
return prefix.isBlank() ? topic : prefix + "." + topic;
}
private String buildNotificationTopicName(String topic, String serviceId) {
return topic + "." + serviceId;
}
public String buildConsumerGroupId(String servicePrefix, TenantId tenantId, String queueName, Integer partitionId) {
return this.buildTopicName(
servicePrefix + queueName

View File

@ -151,6 +151,8 @@ queue:
core:
# Default topic name
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}"
# Interval in milliseconds to poll messages by Core microservices
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
# Amount of partitions used by Core microservices

View File

@ -347,6 +347,8 @@ queue:
core:
# Default topic name
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}"
# Interval in milliseconds to poll messages by Core microservices
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
# Amount of partitions used by Core microservices
@ -377,6 +379,8 @@ queue:
rule-engine:
# Deprecated. It will be removed in the nearest releases
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}"
# Interval in milliseconds to poll messages by Rule Engine
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
# Timeout for processing a message pack of Rule Engine

View File

@ -296,6 +296,8 @@ queue:
core:
# Default topic name
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}"
# Interval in milliseconds to poll messages by Core microservices
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
# Amount of partitions used by Core microservices
@ -326,6 +328,8 @@ queue:
rule-engine:
# Deprecated. It will be removed in the nearest releases
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}"
# Interval in milliseconds to poll messages by Rule Engine
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
# Timeout for processing a message pack of Rule Engine

View File

@ -397,6 +397,8 @@ queue:
core:
# Default topic name
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}"
# Interval in milliseconds to poll messages by Core microservices
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
# Amount of partitions used by Core microservices
@ -427,6 +429,8 @@ queue:
rule-engine:
# Deprecated. It will be removed in the nearest releases
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}"
# Interval in milliseconds to poll messages by Rule Engine
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
# Timeout for processing a message pack of Rule Engine

View File

@ -330,6 +330,8 @@ queue:
core:
# Default topic name
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}"
# Interval in milliseconds to poll messages by Core microservices
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
# Amount of partitions used by Core microservices
@ -360,6 +362,8 @@ queue:
rule-engine:
# Deprecated. It will be removed in the nearest releases
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}"
# Interval in milliseconds to poll messages by Rule Engine
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
# Timeout for processing a message pack of Rule Engine

View File

@ -283,6 +283,8 @@ queue:
core:
# Default topic name
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}"
# Interval in milliseconds to poll messages by Core microservices
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
# Amount of partitions used by Core microservices
@ -313,6 +315,8 @@ queue:
rule-engine:
# Deprecated. It will be removed in the nearest releases
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}"
# Interval in milliseconds to poll messages by Rule Engine
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
# Timeout for processing a message pack of Rule Engine