Refactoring
This commit is contained in:
parent
a101c9403f
commit
2c7c63c537
@ -36,7 +36,9 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
|
|||||||
private boolean seqIdNewCycleStarted;
|
private boolean seqIdNewCycleStarted;
|
||||||
private Long maxReadRecordsCount;
|
private Long maxReadRecordsCount;
|
||||||
private final EdgeEventService edgeEventService;
|
private final EdgeEventService edgeEventService;
|
||||||
// Subtract from queueStartTs to ensure no data is lost due to potential misordering of edge events by created_time.
|
// Subtract from queueStartTs to compensate for possible misalignment between `created_time` and `seqId`.
|
||||||
|
// This ensures early events with lower seqId are not skipped due to partitioning by `created_time`.
|
||||||
|
// See: edge_event is partitioned by created_time but sorted by seqId during retrieval.
|
||||||
private final long misorderingCompensationMillis;
|
private final long misorderingCompensationMillis;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -1480,8 +1480,10 @@ edges:
|
|||||||
no_read_records_sleep: "${EDGES_NO_READ_RECORDS_SLEEP:1000}"
|
no_read_records_sleep: "${EDGES_NO_READ_RECORDS_SLEEP:1000}"
|
||||||
# Number of milliseconds to wait before resending failed batch of edge events to edge
|
# Number of milliseconds to wait before resending failed batch of edge events to edge
|
||||||
sleep_between_batches: "${EDGES_SLEEP_BETWEEN_BATCHES:60000}"
|
sleep_between_batches: "${EDGES_SLEEP_BETWEEN_BATCHES:60000}"
|
||||||
# Number of milliseconds to subtract from queue start timestamp to compensate for potential
|
# Time (in milliseconds) to subtract from the start timestamp when fetching edge events.
|
||||||
# misordering of edge events by created_time. Prevents skipping early events.
|
# This compensates for possible misordering between `created_time` (used for partitioning)
|
||||||
|
# and `seqId` (used for sorting). Without this, events with smaller seqId but larger created_time
|
||||||
|
# might be skipped, especially across partition boundaries.
|
||||||
misordering_compensation_millis: "${EDGES_MISORDERING_COMPENSATION_MILLIS:3600000}"
|
misordering_compensation_millis: "${EDGES_MISORDERING_COMPENSATION_MILLIS:3600000}"
|
||||||
# Max number of high priority edge events per edge session. No persistence - stored in memory
|
# Max number of high priority edge events per edge session. No persistence - stored in memory
|
||||||
max_high_priority_queue_size_per_session: "${EDGES_MAX_HIGH_PRIORITY_QUEUE_SIZE_PER_SESSION:10000}"
|
max_high_priority_queue_size_per_session: "${EDGES_MAX_HIGH_PRIORITY_QUEUE_SIZE_PER_SESSION:10000}"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user