Merge pull request #13147 from volodymyr-babak/edge-consumer-limits

Enhance Edge Queue Processing with Rate Limiting and Logging Improvements
This commit is contained in:
Andrew Shvayka 2025-04-10 11:45:15 +04:00 committed by GitHub
commit e8eef3a7df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 44 additions and 30 deletions

View File

@ -507,7 +507,7 @@ public class EdgeMsgConstructorUtils {
JsonObject data = entityData.getAsJsonObject();
builder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(data.getAsJsonObject("data"), ts));
} catch (Exception e) {
log.warn("[{}][{}] Can't convert to telemetry proto, entityData [{}]", tenantId, entityId, entityData, e);
log.trace("[{}][{}] Can't convert to telemetry proto, entityData [{}]", tenantId, entityId, entityData, e);
}
break;
case ATTRIBUTES_UPDATED:
@ -522,7 +522,7 @@ public class EdgeMsgConstructorUtils {
builder.setPostAttributeScope(getScopeOfDefault(data));
builder.setAttributeTs(ts);
} catch (Exception e) {
log.warn("[{}][{}] Can't convert to AttributesUpdatedMsg proto, entityData [{}]", tenantId, entityId, entityData, e);
log.trace("[{}][{}] Can't convert to AttributesUpdatedMsg proto, entityData [{}]", tenantId, entityId, entityData, e);
}
break;
case POST_ATTRIBUTES:
@ -533,7 +533,7 @@ public class EdgeMsgConstructorUtils {
builder.setPostAttributeScope(getScopeOfDefault(data));
builder.setAttributeTs(ts);
} catch (Exception e) {
log.warn("[{}][{}] Can't convert to PostAttributesMsg, entityData [{}]", tenantId, entityId, entityData, e);
log.trace("[{}][{}] Can't convert to PostAttributesMsg, entityData [{}]", tenantId, entityId, entityData, e);
}
break;
case ATTRIBUTES_DELETED:
@ -546,7 +546,7 @@ public class EdgeMsgConstructorUtils {
attributeDeleteMsg.build();
builder.setAttributeDeleteMsg(attributeDeleteMsg);
} catch (Exception e) {
log.warn("[{}][{}] Can't convert to AttributeDeleteMsg proto, entityData [{}]", tenantId, entityId, entityData, e);
log.trace("[{}][{}] Can't convert to AttributeDeleteMsg proto, entityData [{}]", tenantId, entityId, entityData, e);
}
break;
}

View File

@ -203,7 +203,7 @@ public abstract class EdgeGrpcSession implements Closeable {
@Override
public void onError(Throwable t) {
log.error("[{}][{}] Stream was terminated due to error:", tenantId, sessionId, t);
log.trace("[{}][{}] Stream was terminated due to error:", tenantId, sessionId, t);
closeSession();
}
@ -255,7 +255,7 @@ public abstract class EdgeGrpcSession implements Closeable {
private void doSync(EdgeSyncCursor cursor) {
if (cursor.hasNext()) {
EdgeEventFetcher next = cursor.getNext();
log.info("[{}][{}] starting sync process, cursor current idx = {}, class = {}",
log.debug("[{}][{}] starting sync process, cursor current idx = {}, class = {}",
tenantId, edge.getId(), cursor.getCurrentIdx(), next.getClass().getSimpleName());
ListenableFuture<Pair<Long, Long>> future = startProcessingEdgeEvents(next);
Futures.addCallback(future, new FutureCallback<>() {
@ -651,7 +651,7 @@ public abstract class EdgeGrpcSession implements Closeable {
default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, sessionId, edgeEvent.getAction());
}
} catch (Exception e) {
log.error("[{}][{}] Exception during converting edge event to downlink msg", tenantId, sessionId, e);
log.trace("[{}][{}] Exception during converting edge event to downlink msg", tenantId, sessionId, e);
}
if (downlinkMsg != null) {
result.add(downlinkMsg);
@ -763,7 +763,7 @@ public abstract class EdgeGrpcSession implements Closeable {
try {
outputStream.onNext(responseMsg);
} catch (Exception e) {
log.error("[{}][{}] Failed to send downlink message [{}]", tenantId, sessionId, downlinkMsgStr, e);
log.trace("[{}][{}] Failed to send downlink message [{}]", tenantId, sessionId, downlinkMsgStr, e);
connected = false;
sessionCloseListener.accept(edge, sessionId);
} finally {
@ -909,7 +909,7 @@ public abstract class EdgeGrpcSession implements Closeable {
}
} catch (Exception e) {
String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg);
log.error("[{}][{}] Can't process uplink msg [{}]", edge.getTenantId(), sessionId, uplinkMsg, e);
log.trace("[{}][{}] Can't process uplink msg [{}]", edge.getTenantId(), sessionId, uplinkMsg, e);
ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(edge.getTenantId()).edgeId(edge.getId())
.customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(e.getMessage()).build());
return Futures.immediateFailedFuture(e);

View File

@ -1590,11 +1590,24 @@ queue:
# tb_rule_engine.sq:
# - key: max.poll.records
# value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}"
tb_edge_event.notifications:
# Example of specific consumer properties value per topic for edge event
tb_edge:
# Properties for consumers targeting edge service update topics.
- key: max.poll.records
# Example of specific consumer properties value per topic for edge event
value: "${TB_QUEUE_KAFKA_EDGE_EVENT_MAX_POLL_RECORDS:50}"
# Define the maximum number of records that can be polled from tb_edge topics per request.
value: "${TB_QUEUE_KAFKA_EDGE_EVENTS_MAX_POLL_RECORDS:10}"
tb_edge.notifications:
# Properties for consumers targeting high-priority edge notifications.
# These notifications include RPC calls, lifecycle events, and new queue messages,
# requiring minimal latency and swift processing.
- key: max.poll.records
# Define the maximum number of records that can be polled from tb_edge.notifications.<SERVICE_ID> topics.
value: "${TB_QUEUE_KAFKA_EDGE_HP_EVENTS_MAX_POLL_RECORDS:10}"
tb_edge_event.notifications:
# Properties for consumers targeting downlinks meant for specific edge topics.
# Topic names are dynamically constructed using tenant and edge identifiers.
- key: max.poll.records
# Define the maximum number of records that can be polled from tb_edge_event.notifications.<TENANT_ID>.<EDGE_ID> topics.
value: "${TB_QUEUE_KAFKA_EDGE_NOTIFICATIONS_MAX_POLL_RECORDS:10}"
tb_housekeeper:
# Consumer properties for Housekeeper tasks topic
- key: max.poll.records
@ -1841,11 +1854,13 @@ queue:
# Interval in milliseconds to poll messages
poll_interval: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}"
edge:
# Default topic name
# Topic name to notify edge service on entity updates, assignment, etc.
topic: "${TB_QUEUE_EDGE_TOPIC:tb_edge}"
# For high-priority notifications that require minimum latency and processing time
# Topic prefix for high-priority edge notifications (rpc, lifecycle, new messages in queue) that require minimum latency and processing time.
# Each tb-core has its own topic: <PREFIX>.<SERVICE_ID>
notifications_topic: "${TB_QUEUE_EDGE_NOTIFICATIONS_TOPIC:tb_edge.notifications}"
# For edge events messages
# Topic prefix for downlinks to be pushed to specific edge.
# Every edge has its own unique topic: <PREFIX>.<TENANT_ID>.<EDGE_ID>
event_notifications_topic: "${TB_QUEUE_EDGE_EVENT_NOTIFICATIONS_TOPIC:tb_edge_event.notifications}"
# Amount of partitions used by Edge services
partitions: "${TB_QUEUE_EDGE_PARTITIONS:10}"

View File

@ -38,7 +38,7 @@ public class TopicService {
@Value("${queue.rule-engine.notifications-topic:tb_rule_engine.notifications}")
private String tbRuleEngineNotificationsTopic;
@Value("${queue.transport.notifications-topics:tb_transport.notifications}")
@Value("${queue.transport.notifications-topic:tb_transport.notifications}")
private String tbTransportNotificationsTopic;
@Value("${queue.edge.notifications-topic:tb_edge.notifications}")

View File

@ -50,8 +50,6 @@ import java.util.Properties;
@Component
public class TbKafkaSettings {
private static final List<String> DYNAMIC_TOPICS = List.of("tb_edge_event.notifications");
@Value("${queue.kafka.bootstrap.servers}")
private String servers;
@ -163,18 +161,20 @@ public class TbKafkaSettings {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerPropertiesPerTopic
.getOrDefault(topic, Collections.emptyList())
.forEach(kv -> props.put(kv.getKey(), kv.getValue()));
if (topic != null) {
DYNAMIC_TOPICS.stream()
.filter(topic::startsWith)
.findFirst()
.ifPresent(prefix -> consumerPropertiesPerTopic.getOrDefault(prefix, Collections.emptyList())
.forEach(kv -> props.put(kv.getKey(), kv.getValue())));
List<TbProperty> properties = consumerPropertiesPerTopic.get(topic);
if (properties == null) {
for (Map.Entry<String, List<TbProperty>> entry : consumerPropertiesPerTopic.entrySet()) {
if (topic.startsWith(entry.getKey())) {
properties = entry.getValue();
break;
}
}
}
if (properties != null) {
properties.forEach(kv -> props.put(kv.getKey(), kv.getValue()));
}
}
return props;
}

View File

@ -115,7 +115,6 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider {
return toHousekeeper;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToEdgeMsg>> getTbEdgeMsgProducer() {
return toEdge;