From c443b5e8c689b4b1be13a3a29c5d9efcf029f933 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 9 Apr 2025 16:59:37 +0300 Subject: [PATCH 1/2] Add max poll records for edge consumers. silent errors in case of processing edge events --- .../service/edge/EdgeMsgConstructorUtils.java | 8 ++--- .../service/edge/rpc/EdgeGrpcSession.java | 10 +++---- .../ttl/KafkaEdgeTopicsCleanUpService.java | 2 +- .../src/main/resources/thingsboard.yml | 29 ++++++++++++++----- .../server/queue/discovery/TopicService.java | 12 ++++---- .../server/queue/kafka/TbKafkaSettings.java | 2 +- .../provider/TbCoreQueueProducerProvider.java | 1 - 7 files changed, 39 insertions(+), 25 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java index 9bcd8fc373..d996a1dbc9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java @@ -507,7 +507,7 @@ public class EdgeMsgConstructorUtils { JsonObject data = entityData.getAsJsonObject(); builder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(data.getAsJsonObject("data"), ts)); } catch (Exception e) { - log.warn("[{}][{}] Can't convert to telemetry proto, entityData [{}]", tenantId, entityId, entityData, e); + log.trace("[{}][{}] Can't convert to telemetry proto, entityData [{}]", tenantId, entityId, entityData, e); } break; case ATTRIBUTES_UPDATED: @@ -522,7 +522,7 @@ public class EdgeMsgConstructorUtils { builder.setPostAttributeScope(getScopeOfDefault(data)); builder.setAttributeTs(ts); } catch (Exception e) { - log.warn("[{}][{}] Can't convert to AttributesUpdatedMsg proto, entityData [{}]", tenantId, entityId, entityData, e); + log.trace("[{}][{}] Can't convert to AttributesUpdatedMsg proto, entityData [{}]", tenantId, entityId, entityData, e); } break; case POST_ATTRIBUTES: @@ -533,7 +533,7 @@ public class EdgeMsgConstructorUtils { builder.setPostAttributeScope(getScopeOfDefault(data)); builder.setAttributeTs(ts); } catch (Exception e) { - log.warn("[{}][{}] Can't convert to PostAttributesMsg, entityData [{}]", tenantId, entityId, entityData, e); + log.trace("[{}][{}] Can't convert to PostAttributesMsg, entityData [{}]", tenantId, entityId, entityData, e); } break; case ATTRIBUTES_DELETED: @@ -546,7 +546,7 @@ public class EdgeMsgConstructorUtils { attributeDeleteMsg.build(); builder.setAttributeDeleteMsg(attributeDeleteMsg); } catch (Exception e) { - log.warn("[{}][{}] Can't convert to AttributeDeleteMsg proto, entityData [{}]", tenantId, entityId, entityData, e); + log.trace("[{}][{}] Can't convert to AttributeDeleteMsg proto, entityData [{}]", tenantId, entityId, entityData, e); } break; } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index ecd0c6d9f3..9ce5973639 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -203,7 +203,7 @@ public abstract class EdgeGrpcSession implements Closeable { @Override public void onError(Throwable t) { - log.error("[{}][{}] Stream was terminated due to error:", tenantId, sessionId, t); + log.trace("[{}][{}] Stream was terminated due to error:", tenantId, sessionId, t); closeSession(); } @@ -255,7 +255,7 @@ public abstract class EdgeGrpcSession implements Closeable { private void doSync(EdgeSyncCursor cursor) { if (cursor.hasNext()) { EdgeEventFetcher next = cursor.getNext(); - log.info("[{}][{}] starting sync process, cursor current idx = {}, class = {}", + log.debug("[{}][{}] starting sync process, cursor current idx = {}, class = {}", tenantId, edge.getId(), cursor.getCurrentIdx(), next.getClass().getSimpleName()); ListenableFuture> future = startProcessingEdgeEvents(next); Futures.addCallback(future, new FutureCallback<>() { @@ -651,7 +651,7 @@ public abstract class EdgeGrpcSession implements Closeable { default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, sessionId, edgeEvent.getAction()); } } catch (Exception e) { - log.error("[{}][{}] Exception during converting edge event to downlink msg", tenantId, sessionId, e); + log.trace("[{}][{}] Exception during converting edge event to downlink msg", tenantId, sessionId, e); } if (downlinkMsg != null) { result.add(downlinkMsg); @@ -763,7 +763,7 @@ public abstract class EdgeGrpcSession implements Closeable { try { outputStream.onNext(responseMsg); } catch (Exception e) { - log.error("[{}][{}] Failed to send downlink message [{}]", tenantId, sessionId, downlinkMsgStr, e); + log.trace("[{}][{}] Failed to send downlink message [{}]", tenantId, sessionId, downlinkMsgStr, e); connected = false; sessionCloseListener.accept(edge, sessionId); } finally { @@ -909,7 +909,7 @@ public abstract class EdgeGrpcSession implements Closeable { } } catch (Exception e) { String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg); - log.error("[{}][{}] Can't process uplink msg [{}]", edge.getTenantId(), sessionId, uplinkMsg, e); + log.trace("[{}][{}] Can't process uplink msg [{}]", edge.getTenantId(), sessionId, uplinkMsg, e); ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(edge.getTenantId()).edgeId(edge.getId()) .customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(e.getMessage()).build()); return Futures.immediateFailedFuture(e); diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java index 73712542fd..c32ed92e31 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java @@ -62,7 +62,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { @Value("${sql.ttl.edge_events.edge_events_ttl:2628000}") private long ttlSeconds; - @Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}") + @Value("${queue.edge.event_notifications_topic:tb_edge_event.notifications}") private String tbEdgeEventNotificationsTopic; public KafkaEdgeTopicsCleanUpService(PartitionService partitionService, EdgeService edgeService, diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index ecd4ddebb7..b4d178a737 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1590,11 +1590,24 @@ queue: # tb_rule_engine.sq: # - key: max.poll.records # value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}" - tb_edge_event.notifications: - # Example of specific consumer properties value per topic for edge event + tb_edge: + # Properties for consumers targeting edge service update topics. - key: max.poll.records - # Example of specific consumer properties value per topic for edge event - value: "${TB_QUEUE_KAFKA_EDGE_EVENT_MAX_POLL_RECORDS:50}" + # Define the maximum number of records that can be polled from tb_edge topics per request. + value: "${TB_QUEUE_KAFKA_EDGE_EVENTS_MAX_POLL_RECORDS:10}" + tb_edge.notifications: + # Properties for consumers targeting high-priority edge notifications. + # These notifications include RPC calls, lifecycle events, and new queue messages, + # requiring minimal latency and swift processing. + - key: max.poll.records + # Define the maximum number of records that can be polled from tb_edge.notifications. topics. + value: "${TB_QUEUE_KAFKA_EDGE_HP_EVENTS_MAX_POLL_RECORDS:10}" + tb_edge_event.notifications: + # Properties for consumers targeting downlinks meant for specific edge topics. + # Topic names are dynamically constructed using tenant and edge identifiers. + - key: max.poll.records + # Define the maximum number of records that can be polled from tb_edge_event.notifications.. topics. + value: "${TB_QUEUE_KAFKA_EDGE_NOTIFICATIONS_MAX_POLL_RECORDS:10}" tb_housekeeper: # Consumer properties for Housekeeper tasks topic - key: max.poll.records @@ -1841,11 +1854,13 @@ queue: # Interval in milliseconds to poll messages poll_interval: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}" edge: - # Default topic name + # Topic name to notify edge service on entity updates, assignment, etc. topic: "${TB_QUEUE_EDGE_TOPIC:tb_edge}" - # For high-priority notifications that require minimum latency and processing time + # Topic prefix for high-priority edge notifications (rpc, lifecycle, new messages in queue) that require minimum latency and processing time. + # Each tb-core has its own topic: . notifications_topic: "${TB_QUEUE_EDGE_NOTIFICATIONS_TOPIC:tb_edge.notifications}" - # For edge events messages + # Topic prefix for downlinks to be pushed to specific edge. + # Every edge has its own unique topic: .. event_notifications_topic: "${TB_QUEUE_EDGE_EVENT_NOTIFICATIONS_TOPIC:tb_edge_event.notifications}" # Amount of partitions used by Edge services partitions: "${TB_QUEUE_EDGE_PARTITIONS:10}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index 42263138c1..4b6c0aa139 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -32,22 +32,22 @@ public class TopicService { @Value("${queue.prefix:}") private String prefix; - @Value("${queue.core.notifications-topic:tb_core.notifications}") + @Value("${queue.core.notifications_topic:tb_core.notifications}") private String tbCoreNotificationsTopic; - @Value("${queue.rule-engine.notifications-topic:tb_rule_engine.notifications}") + @Value("${queue.rule-engine.notifications_topic:tb_rule_engine.notifications}") private String tbRuleEngineNotificationsTopic; - @Value("${queue.transport.notifications-topics:tb_transport.notifications}") + @Value("${queue.transport.notifications_topics:tb_transport.notifications}") private String tbTransportNotificationsTopic; - @Value("${queue.edge.notifications-topic:tb_edge.notifications}") + @Value("${queue.edge.notifications_topic:tb_edge.notifications}") private String tbEdgeNotificationsTopic; - @Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}") + @Value("${queue.edge.event_notifications_topic:tb_edge_event.notifications}") private String tbEdgeEventNotificationsTopic; - @Value("${queue.calculated-fields.notifications-topic:calculated_field.notifications}") + @Value("${queue.calculated-fields.notifications_topic:calculated_field.notifications}") private String tbCalculatedFieldNotificationsTopic; private final ConcurrentMap tbCoreNotificationTopics = new ConcurrentHashMap<>(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index 181d422f16..d447d92cb3 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -50,7 +50,7 @@ import java.util.Properties; @Component public class TbKafkaSettings { - private static final List DYNAMIC_TOPICS = List.of("tb_edge_event.notifications"); + private static final List DYNAMIC_TOPICS = List.of("tb_edge.notifications", "tb_edge_event.notifications"); @Value("${queue.kafka.bootstrap.servers}") private String servers; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java index 7c3e415e9f..98a3d78304 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java @@ -115,7 +115,6 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider { return toHousekeeper; } - @Override public TbQueueProducer> getTbEdgeMsgProducer() { return toEdge; From 68289044c578f05596abceda500625c06e6af545 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 9 Apr 2025 17:56:47 +0300 Subject: [PATCH 2/2] TbKafkaSettings - use startswith strategy for consumer props in case direct .get return null --- .../ttl/KafkaEdgeTopicsCleanUpService.java | 2 +- .../server/queue/discovery/TopicService.java | 12 +++++----- .../server/queue/kafka/TbKafkaSettings.java | 24 +++++++++---------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java index c32ed92e31..73712542fd 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java @@ -62,7 +62,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { @Value("${sql.ttl.edge_events.edge_events_ttl:2628000}") private long ttlSeconds; - @Value("${queue.edge.event_notifications_topic:tb_edge_event.notifications}") + @Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}") private String tbEdgeEventNotificationsTopic; public KafkaEdgeTopicsCleanUpService(PartitionService partitionService, EdgeService edgeService, diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index 4b6c0aa139..7271e1cb10 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -32,22 +32,22 @@ public class TopicService { @Value("${queue.prefix:}") private String prefix; - @Value("${queue.core.notifications_topic:tb_core.notifications}") + @Value("${queue.core.notifications-topic:tb_core.notifications}") private String tbCoreNotificationsTopic; - @Value("${queue.rule-engine.notifications_topic:tb_rule_engine.notifications}") + @Value("${queue.rule-engine.notifications-topic:tb_rule_engine.notifications}") private String tbRuleEngineNotificationsTopic; - @Value("${queue.transport.notifications_topics:tb_transport.notifications}") + @Value("${queue.transport.notifications-topic:tb_transport.notifications}") private String tbTransportNotificationsTopic; - @Value("${queue.edge.notifications_topic:tb_edge.notifications}") + @Value("${queue.edge.notifications-topic:tb_edge.notifications}") private String tbEdgeNotificationsTopic; - @Value("${queue.edge.event_notifications_topic:tb_edge_event.notifications}") + @Value("${queue.edge.event-notifications-topic:tb_edge_event.notifications}") private String tbEdgeEventNotificationsTopic; - @Value("${queue.calculated-fields.notifications_topic:calculated_field.notifications}") + @Value("${queue.calculated-fields.notifications-topic:calculated_field.notifications}") private String tbCalculatedFieldNotificationsTopic; private final ConcurrentMap tbCoreNotificationTopics = new ConcurrentHashMap<>(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index d447d92cb3..82b5af179f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -50,8 +50,6 @@ import java.util.Properties; @Component public class TbKafkaSettings { - private static final List DYNAMIC_TOPICS = List.of("tb_edge.notifications", "tb_edge_event.notifications"); - @Value("${queue.kafka.bootstrap.servers}") private String servers; @@ -163,18 +161,20 @@ public class TbKafkaSettings { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerPropertiesPerTopic - .getOrDefault(topic, Collections.emptyList()) - .forEach(kv -> props.put(kv.getKey(), kv.getValue())); - if (topic != null) { - DYNAMIC_TOPICS.stream() - .filter(topic::startsWith) - .findFirst() - .ifPresent(prefix -> consumerPropertiesPerTopic.getOrDefault(prefix, Collections.emptyList()) - .forEach(kv -> props.put(kv.getKey(), kv.getValue()))); + List properties = consumerPropertiesPerTopic.get(topic); + if (properties == null) { + for (Map.Entry> entry : consumerPropertiesPerTopic.entrySet()) { + if (topic.startsWith(entry.getKey())) { + properties = entry.getValue(); + break; + } + } + } + if (properties != null) { + properties.forEach(kv -> props.put(kv.getKey(), kv.getValue())); + } } - return props; }