From e3a72fec09ca0f82584123b2ee2050ec41824db2 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Tue, 4 Feb 2025 12:39:06 +0200 Subject: [PATCH 1/5] Refactor TopicService. Change ymls to be able to configure notification topic names --- .../src/main/resources/thingsboard.yml | 7 +++- .../server/queue/discovery/TopicService.java | 41 +++++++++++++------ .../src/main/resources/tb-vc-executor.yml | 2 + .../src/main/resources/tb-coap-transport.yml | 4 ++ .../src/main/resources/tb-http-transport.yml | 4 ++ .../src/main/resources/tb-lwm2m-transport.yml | 4 ++ .../src/main/resources/tb-mqtt-transport.yml | 4 ++ .../src/main/resources/tb-snmp-transport.yml | 4 ++ 8 files changed, 57 insertions(+), 13 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index f29cc55197..4ee08c7095 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1644,6 +1644,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -1688,7 +1690,6 @@ queue: enabled: "${TB_HOUSEKEEPER_STATS_ENABLED:true}" # Statistics printing interval for Housekeeper print-interval-ms: "${TB_HOUSEKEEPER_STATS_PRINT_INTERVAL_MS:60000}" - vc: # Default topic name topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}" @@ -1720,6 +1721,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine @@ -1743,6 +1746,8 @@ queue: edge: # Default topic name topic: "${TB_QUEUE_EDGE_TOPIC:tb_edge}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_EDGE_NOTIFICATIONS_TOPIC:tb_edge.notifications}" # Amount of partitions used by Edge services partitions: "${TB_QUEUE_EDGE_PARTITIONS:10}" # Poll interval for topics related to Edge services diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index 927c311a2d..254d0a95d6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -32,6 +32,18 @@ public class TopicService { @Value("${queue.prefix:}") private String prefix; + @Value("${queue.core.notifications-topic:tb_core.notifications}") + private String tbCoreNotificationsTopic; + + @Value("${queue.rule-engine.notifications-topic:tb_rule_engine.notifications}") + private String tbRuleEngineNotificationsTopic; + + @Value("${queue.transport.notifications-topics:tb_transport.notifications}") + private String tbTransportNotificationsTopic; + + @Value("${queue.edge.notifications-topic:tb_edge.notifications}") + private String tbEdgeNotificationsTopic; + private final ConcurrentMap tbCoreNotificationTopics = new ConcurrentHashMap<>(); private final ConcurrentMap tbRuleEngineNotificationTopics = new ConcurrentHashMap<>(); private final ConcurrentMap tbEdgeNotificationTopics = new ConcurrentHashMap<>(); @@ -47,19 +59,28 @@ public class TopicService { public TopicPartitionInfo getNotificationsTopic(ServiceType serviceType, String serviceId) { return switch (serviceType) { case TB_CORE -> tbCoreNotificationTopics.computeIfAbsent(serviceId, - id -> buildNotificationsTopicPartitionInfo(serviceType, serviceId)); + id -> buildNotificationsTopicPartitionInfo(tbCoreNotificationsTopic, serviceId)); case TB_RULE_ENGINE -> tbRuleEngineNotificationTopics.computeIfAbsent(serviceId, - id -> buildNotificationsTopicPartitionInfo(serviceType, serviceId)); - default -> buildNotificationsTopicPartitionInfo(serviceType, serviceId); + id -> buildNotificationsTopicPartitionInfo(tbRuleEngineNotificationsTopic, serviceId)); + case TB_TRANSPORT -> buildNotificationsTopicPartitionInfo(tbTransportNotificationsTopic, serviceId); + default -> throw new IllegalStateException("Unexpected service type: " + serviceType); }; } + private TopicPartitionInfo buildNotificationsTopicPartitionInfo(String topic, String serviceId) { + return buildTopicPartitionInfo(buildNotificationTopicName(topic, serviceId), null, null, false); + } + + public TopicPartitionInfo buildTopicPartitionInfo(String topic, TenantId tenantId, Integer partition, boolean myPartition) { + return new TopicPartitionInfo(buildTopicName(topic), tenantId, partition, myPartition); + } + public TopicPartitionInfo getEdgeNotificationsTopic(String serviceId) { return tbEdgeNotificationTopics.computeIfAbsent(serviceId, id -> buildEdgeNotificationsTopicPartitionInfo(serviceId)); } private TopicPartitionInfo buildEdgeNotificationsTopicPartitionInfo(String serviceId) { - return buildTopicPartitionInfo("tb_edge.notifications." + serviceId, null, null, false); + return buildTopicPartitionInfo(buildNotificationTopicName(tbEdgeNotificationsTopic, serviceId), null, null, false); } public TopicPartitionInfo getEdgeEventNotificationsTopic(TenantId tenantId, EdgeId edgeId) { @@ -70,18 +91,14 @@ public class TopicService { return buildTopicPartitionInfo("tb_edge_event.notifications." + tenantId + "." + edgeId, null, null, false); } - private TopicPartitionInfo buildNotificationsTopicPartitionInfo(ServiceType serviceType, String serviceId) { - return buildTopicPartitionInfo(serviceType.name().toLowerCase() + ".notifications." + serviceId, null, null, false); - } - - public TopicPartitionInfo buildTopicPartitionInfo(String topic, TenantId tenantId, Integer partition, boolean myPartition) { - return new TopicPartitionInfo(buildTopicName(topic), tenantId, partition, myPartition); - } - public String buildTopicName(String topic) { return prefix.isBlank() ? topic : prefix + "." + topic; } + private String buildNotificationTopicName(String topic, String serviceId) { + return topic + "." + serviceId; + } + public String buildConsumerGroupId(String servicePrefix, TenantId tenantId, String queueName, Integer partitionId) { return this.buildTopicName( servicePrefix + queueName diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index cba83e5ff7..595e514907 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -151,6 +151,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 66ab703a54..794eaed047 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -351,6 +351,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -381,6 +383,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index be91e4fe67..860059b1ee 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -296,6 +296,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -326,6 +328,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 7bdd68baf1..824d998836 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -397,6 +397,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -427,6 +429,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 8d9a60a319..4b0adab7d1 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -330,6 +330,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -360,6 +362,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 4c11bd0018..e6fe253e79 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -283,6 +283,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -313,6 +315,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine From 8de861f8977d1da48b85959dd520983aaa6a0876 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 7 Feb 2025 16:11:33 +0200 Subject: [PATCH 2/5] Improve customization for edge-event topic --- application/src/main/resources/thingsboard.yml | 2 ++ .../org/thingsboard/server/queue/discovery/TopicService.java | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 4ee08c7095..7b1df1d9d7 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1748,6 +1748,8 @@ queue: topic: "${TB_QUEUE_EDGE_TOPIC:tb_edge}" # For high-priority notifications that require minimum latency and processing time notifications_topic: "${TB_QUEUE_EDGE_NOTIFICATIONS_TOPIC:tb_edge.notifications}" + # For edge events messages + event_notifications_topic: "${TB_QUEUE_EDGE_EVENT_NOTIFICATIONS_TOPIC:tb_edge_event.notifications}" # Amount of partitions used by Edge services partitions: "${TB_QUEUE_EDGE_PARTITIONS:10}" # Poll interval for topics related to Edge services diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index 254d0a95d6..bab55eece8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -44,6 +44,9 @@ public class TopicService { @Value("${queue.edge.notifications-topic:tb_edge.notifications}") private String tbEdgeNotificationsTopic; + @Value("${queue.edge.event-notifications-topic:tb_edge.notifications}") + private String tbEdgeEventNotificationsTopic; + private final ConcurrentMap tbCoreNotificationTopics = new ConcurrentHashMap<>(); private final ConcurrentMap tbRuleEngineNotificationTopics = new ConcurrentHashMap<>(); private final ConcurrentMap tbEdgeNotificationTopics = new ConcurrentHashMap<>(); @@ -88,7 +91,7 @@ public class TopicService { } public TopicPartitionInfo buildEdgeEventNotificationsTopicPartitionInfo(TenantId tenantId, EdgeId edgeId) { - return buildTopicPartitionInfo("tb_edge_event.notifications." + tenantId + "." + edgeId, null, null, false); + return buildTopicPartitionInfo(tbEdgeEventNotificationsTopic + "." + tenantId + "." + edgeId, null, null, false); } public String buildTopicName(String topic) { From b54e0cd59c0d70181c509279a80bf985e61fd719 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 7 Feb 2025 16:12:53 +0200 Subject: [PATCH 3/5] Rename tbEdgeEventNotificationsTopic --- .../org/thingsboard/server/queue/discovery/TopicService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index bab55eece8..ca080f9481 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -44,7 +44,7 @@ public class TopicService { @Value("${queue.edge.notifications-topic:tb_edge.notifications}") private String tbEdgeNotificationsTopic; - @Value("${queue.edge.event-notifications-topic:tb_edge.notifications}") + @Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}") private String tbEdgeEventNotificationsTopic; private final ConcurrentMap tbCoreNotificationTopics = new ConcurrentHashMap<>(); From b29cc4fc413a481ea6ce49b867a90d6aadfe8a51 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Mon, 3 Mar 2025 11:20:04 +0200 Subject: [PATCH 4/5] Fix tupo --- .../org/thingsboard/server/queue/discovery/TopicService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index 3e3051ee21..5992083d85 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -47,7 +47,7 @@ public class TopicService { @Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}") private String tbEdgeEventNotificationsTopic; - @Value("${queue.calculated_fields.notifications-topic:calculated_field}") + @Value("${queue.calculated_fields.notifications-topic:calculated_field.notifications}") private String tbCalculatedFieldNotificationsTopic; private final ConcurrentMap tbCoreNotificationTopics = new ConcurrentHashMap<>(); From bfef1a3ea3c7068c426ad3c69002c296532849d5 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Mon, 3 Mar 2025 11:21:50 +0200 Subject: [PATCH 5/5] Fix notifications_topic for cf --- application/src/main/resources/thingsboard.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 730dfec128..ec9925bf06 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1759,7 +1759,7 @@ queue: # Topic name for Calculated Field (CF) compacted states state_topic: "${TB_QUEUE_CF_STATE_TOPIC:tb_cf_state}" # For high-priority notifications that require minimum latency and processing time - notifications_topic: "${TB_QUEUE_CF_NOTIFICATIONS_TOPIC:calculated_field}" + notifications_topic: "${TB_QUEUE_CF_NOTIFICATIONS_TOPIC:calculated_field.notifications}" # Interval in milliseconds to poll messages by CF (Rule Engine) microservices poll_interval: "${TB_QUEUE_CF_POLL_INTERVAL_MS:25}" # Amount of partitions used by CF microservices