Merge pull request #13789 from jekka001/edge-misordering-compensation-config

Edge Misordering Compensation Config
This commit is contained in:
Viacheslav Klimov 2025-09-11 10:49:28 +03:00 committed by GitHub
commit 3f7c6df2e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 14 additions and 6 deletions

View File

@ -29,4 +29,6 @@ public class EdgeEventStorageSettings {
private long noRecordsSleepInterval;
@Value("${edges.storage.sleep_between_batches}")
private long sleepIntervalBetweenBatches;
@Value("${edges.storage.misordering_compensation_millis:60000}")
private long misorderingCompensationMillis;
}

View File

@ -589,7 +589,8 @@ public abstract class EdgeGrpcSession implements Closeable {
previousStartSeqId,
false,
Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()),
ctx.getEdgeEventService());
ctx.getEdgeEventService(),
ctx.getEdgeEventStorageSettings().getMisorderingCompensationMillis());
log.trace("[{}][{}] starting processing edge events, previousStartTs = {}, previousStartSeqId = {}",
tenantId, edge.getId(), previousStartTs, previousStartSeqId);
Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() {

View File

@ -26,13 +26,9 @@ import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.dao.edge.EdgeEventService;
import java.util.concurrent.TimeUnit;
@AllArgsConstructor
@Slf4j
public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
// Subtract from queueStartTs to ensure no data is lost due to potential misordering of edge events by created_time.
private static final long MISORDERING_COMPENSATION_MILLIS = TimeUnit.SECONDS.toMillis(60);
private final Long queueStartTs;
private Long seqIdStart;
@ -40,6 +36,10 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
private boolean seqIdNewCycleStarted;
private Long maxReadRecordsCount;
private final EdgeEventService edgeEventService;
// Subtract from queueStartTs to compensate for possible misalignment between `created_time` and `seqId`.
// This ensures early events with lower seqId are not skipped due to partitioning by `created_time`.
// See: edge_event is partitioned by created_time but sorted by seqId during retrieval.
private final long misorderingCompensationMillis;
@Override
public PageLink getPageLink(int pageSize) {
@ -48,7 +48,7 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
0,
null,
null,
queueStartTs > 0 ? queueStartTs - MISORDERING_COMPENSATION_MILLIS : 0,
queueStartTs > 0 ? queueStartTs - misorderingCompensationMillis : 0,
System.currentTimeMillis());
}

View File

@ -1483,6 +1483,11 @@ edges:
no_read_records_sleep: "${EDGES_NO_READ_RECORDS_SLEEP:1000}"
# Number of milliseconds to wait before resending failed batch of edge events to edge
sleep_between_batches: "${EDGES_SLEEP_BETWEEN_BATCHES:60000}"
# Time (in milliseconds) to subtract from the start timestamp when fetching edge events.
# This compensates for possible misordering between `created_time` (used for partitioning)
# and `seqId` (used for sorting). Without this, events with smaller seqId but larger created_time
# might be skipped, especially across partition boundaries.
misordering_compensation_millis: "${EDGES_MISORDERING_COMPENSATION_MILLIS:60000}"
# Max number of high priority edge events per edge session. No persistence - stored in memory
max_high_priority_queue_size_per_session: "${EDGES_MAX_HIGH_PRIORITY_QUEUE_SIZE_PER_SESSION:10000}"
# Number of threads that are used to check DB for edge events