diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 4dfc51807a..ec9925bf06 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1659,6 +1659,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -1734,6 +1736,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine @@ -1754,6 +1758,8 @@ queue: event_topic: "${TB_QUEUE_CF_EVENT_TOPIC:tb_cf_event}" # Topic name for Calculated Field (CF) compacted states state_topic: "${TB_QUEUE_CF_STATE_TOPIC:tb_cf_state}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CF_NOTIFICATIONS_TOPIC:calculated_field.notifications}" # Interval in milliseconds to poll messages by CF (Rule Engine) microservices poll_interval: "${TB_QUEUE_CF_POLL_INTERVAL_MS:25}" # Amount of partitions used by CF microservices @@ -1772,6 +1778,10 @@ queue: edge: # Default topic name topic: "${TB_QUEUE_EDGE_TOPIC:tb_edge}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_EDGE_NOTIFICATIONS_TOPIC:tb_edge.notifications}" + # For edge events messages + event_notifications_topic: "${TB_QUEUE_EDGE_EVENT_NOTIFICATIONS_TOPIC:tb_edge_event.notifications}" # Amount of partitions used by Edge services partitions: "${TB_QUEUE_EDGE_PARTITIONS:10}" # Poll interval for topics related to Edge services diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index 81be9e2207..5992083d85 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -32,6 +32,24 @@ public class TopicService { @Value("${queue.prefix:}") private String prefix; + @Value("${queue.core.notifications-topic:tb_core.notifications}") + private String tbCoreNotificationsTopic; + + @Value("${queue.rule-engine.notifications-topic:tb_rule_engine.notifications}") + private String tbRuleEngineNotificationsTopic; + + @Value("${queue.transport.notifications-topics:tb_transport.notifications}") + private String tbTransportNotificationsTopic; + + @Value("${queue.edge.notifications-topic:tb_edge.notifications}") + private String tbEdgeNotificationsTopic; + + @Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}") + private String tbEdgeEventNotificationsTopic; + + @Value("${queue.calculated_fields.notifications-topic:calculated_field.notifications}") + private String tbCalculatedFieldNotificationsTopic; + private final ConcurrentMap tbCoreNotificationTopics = new ConcurrentHashMap<>(); private final ConcurrentMap tbRuleEngineNotificationTopics = new ConcurrentHashMap<>(); private final ConcurrentMap tbEdgeNotificationTopics = new ConcurrentHashMap<>(); @@ -48,24 +66,32 @@ public class TopicService { public TopicPartitionInfo getNotificationsTopic(ServiceType serviceType, String serviceId) { return switch (serviceType) { case TB_CORE -> tbCoreNotificationTopics.computeIfAbsent(serviceId, - id -> buildNotificationsTopicPartitionInfo(serviceType, serviceId)); + id -> buildNotificationsTopicPartitionInfo(tbCoreNotificationsTopic, serviceId)); case TB_RULE_ENGINE -> tbRuleEngineNotificationTopics.computeIfAbsent(serviceId, - id -> buildNotificationsTopicPartitionInfo(serviceType, serviceId)); - default -> buildNotificationsTopicPartitionInfo(serviceType, serviceId); + id -> buildNotificationsTopicPartitionInfo(tbRuleEngineNotificationsTopic, serviceId)); + case TB_TRANSPORT -> buildNotificationsTopicPartitionInfo(tbTransportNotificationsTopic, serviceId); + default -> throw new IllegalStateException("Unexpected service type: " + serviceType); }; } + private TopicPartitionInfo buildNotificationsTopicPartitionInfo(String topic, String serviceId) { + return buildTopicPartitionInfo(buildNotificationTopicName(topic, serviceId), null, null, false); + } + + public TopicPartitionInfo buildTopicPartitionInfo(String topic, TenantId tenantId, Integer partition, boolean myPartition) { + return new TopicPartitionInfo(buildTopicName(topic), tenantId, partition, myPartition); + } + public TopicPartitionInfo getEdgeNotificationsTopic(String serviceId) { return tbEdgeNotificationTopics.computeIfAbsent(serviceId, id -> buildEdgeNotificationsTopicPartitionInfo(serviceId)); } private TopicPartitionInfo buildEdgeNotificationsTopicPartitionInfo(String serviceId) { - return buildTopicPartitionInfo("tb_edge.notifications." + serviceId, null, null, false); + return buildTopicPartitionInfo(buildNotificationTopicName(tbEdgeNotificationsTopic, serviceId), null, null, false); } public TopicPartitionInfo getCalculatedFieldNotificationsTopic(String serviceId) { - return tbCalculatedFieldNotificationTopics.computeIfAbsent(serviceId, - id -> buildNotificationsTopicPartitionInfo("calculated_field", serviceId)); + return tbCalculatedFieldNotificationTopics.computeIfAbsent(serviceId, id -> buildNotificationsTopicPartitionInfo(tbCalculatedFieldNotificationsTopic, serviceId)); } public TopicPartitionInfo getEdgeEventNotificationsTopic(TenantId tenantId, EdgeId edgeId) { @@ -73,25 +99,17 @@ public class TopicService { } public TopicPartitionInfo buildEdgeEventNotificationsTopicPartitionInfo(TenantId tenantId, EdgeId edgeId) { - return buildTopicPartitionInfo("tb_edge_event.notifications." + tenantId + "." + edgeId, null, null, false); - } - - private TopicPartitionInfo buildNotificationsTopicPartitionInfo(ServiceType serviceType, String serviceId) { - return buildNotificationsTopicPartitionInfo(serviceType.name().toLowerCase(), serviceId); - } - - private TopicPartitionInfo buildNotificationsTopicPartitionInfo(String serviceType, String serviceId) { - return buildTopicPartitionInfo(serviceType + ".notifications." + serviceId, null, null, false); - } - - public TopicPartitionInfo buildTopicPartitionInfo(String topic, TenantId tenantId, Integer partition, boolean myPartition) { - return new TopicPartitionInfo(buildTopicName(topic), tenantId, partition, myPartition); + return buildTopicPartitionInfo(tbEdgeEventNotificationsTopic + "." + tenantId + "." + edgeId, null, null, false); } public String buildTopicName(String topic) { return prefix.isBlank() ? topic : prefix + "." + topic; } + private String buildNotificationTopicName(String topic, String serviceId) { + return topic + "." + serviceId; + } + public String buildConsumerGroupId(String servicePrefix, TenantId tenantId, String queueName, Integer partitionId) { return this.buildTopicName( servicePrefix + queueName diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 1d4d1592cf..f0b1426cc7 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -151,6 +151,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index f1d325b81c..f8df4bb55e 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -347,6 +347,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -377,6 +379,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index eca40b19fb..d282b50ff3 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -296,6 +296,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -326,6 +328,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 149fc2a6e2..a198613e11 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -397,6 +397,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -427,6 +429,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index b35ce5e7be..fb75203499 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -330,6 +330,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -360,6 +362,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 4de8a0e2c5..281e221674 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -283,6 +283,8 @@ queue: core: # Default topic name topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_CORE_NOTIFICATIONS_TOPIC:tb_core.notifications}" # Interval in milliseconds to poll messages by Core microservices poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" # Amount of partitions used by Core microservices @@ -313,6 +315,8 @@ queue: rule-engine: # Deprecated. It will be removed in the nearest releases topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" + # For high-priority notifications that require minimum latency and processing time + notifications_topic: "${TB_QUEUE_RULE_ENGINE_NOTIFICATIONS_TOPIC:tb_rule_engine.notifications}" # Interval in milliseconds to poll messages by Rule Engine poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" # Timeout for processing a message pack of Rule Engine