From b191c2a2ec5932d39b62370da2889c116e59833e Mon Sep 17 00:00:00 2001 From: Yevhenii Date: Tue, 29 Jul 2025 17:07:58 +0300 Subject: [PATCH] Edge Misordering Compensation Config - Moved misordering compensation to configuration for better control of edge event handling and data consistency --- .../server/service/edge/rpc/EdgeEventStorageSettings.java | 2 ++ .../server/service/edge/rpc/EdgeGrpcSession.java | 2 +- .../service/edge/rpc/fetch/GeneralEdgeEventFetcher.java | 8 +++----- application/src/main/resources/thingsboard.yml | 3 +++ 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventStorageSettings.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventStorageSettings.java index 89d19438bd..d9e3f1a920 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventStorageSettings.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventStorageSettings.java @@ -29,4 +29,6 @@ public class EdgeEventStorageSettings { private long noRecordsSleepInterval; @Value("${edges.storage.sleep_between_batches}") private long sleepIntervalBetweenBatches; + @Value("${edges.storage.misordering_compensation_millis:3600000}") + private long misorderingCompensationMillis; } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 521730741f..03732bed64 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -589,7 +589,7 @@ public abstract class EdgeGrpcSession implements Closeable { previousStartSeqId, false, Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()), - ctx.getEdgeEventService()); + ctx.getEdgeEventService(), ctx.getEdgeEventStorageSettings().getMisorderingCompensationMillis()); log.trace("[{}][{}] starting processing edge events, previousStartTs = {}, previousStartSeqId = {}", tenantId, edge.getId(), previousStartTs, previousStartSeqId); Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java index 5d7df601b5..e3fa970284 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java @@ -26,13 +26,9 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.dao.edge.EdgeEventService; -import java.util.concurrent.TimeUnit; - @AllArgsConstructor @Slf4j public class GeneralEdgeEventFetcher implements EdgeEventFetcher { - // Subtract from queueStartTs to ensure no data is lost due to potential misordering of edge events by created_time. - private static final long MISORDERING_COMPENSATION_MILLIS = TimeUnit.SECONDS.toMillis(60); private final Long queueStartTs; private Long seqIdStart; @@ -40,6 +36,8 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher { private boolean seqIdNewCycleStarted; private Long maxReadRecordsCount; private final EdgeEventService edgeEventService; + // Subtract from queueStartTs to ensure no data is lost due to potential misordering of edge events by created_time. + private final long misorderingCompensationMillis; @Override public PageLink getPageLink(int pageSize) { @@ -48,7 +46,7 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher { 0, null, null, - queueStartTs > 0 ? queueStartTs - MISORDERING_COMPENSATION_MILLIS : 0, + queueStartTs > 0 ? queueStartTs - misorderingCompensationMillis : 0, System.currentTimeMillis()); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 0c1d419da2..61ed93b03c 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1480,6 +1480,9 @@ edges: no_read_records_sleep: "${EDGES_NO_READ_RECORDS_SLEEP:1000}" # Number of milliseconds to wait before resending failed batch of edge events to edge sleep_between_batches: "${EDGES_SLEEP_BETWEEN_BATCHES:60000}" + # Number of milliseconds to subtract from queue start timestamp to compensate for potential + # misordering of edge events by created_time. Prevents skipping early events. + misordering_compensation_millis: "${EDGES_MISORDERING_COMPENSATION_MILLIS:3600000}" # Max number of high priority edge events per edge session. No persistence - stored in memory max_high_priority_queue_size_per_session: "${EDGES_MAX_HIGH_PRIORITY_QUEUE_SIZE_PER_SESSION:10000}" # Number of threads that are used to check DB for edge events