From b191c2a2ec5932d39b62370da2889c116e59833e Mon Sep 17 00:00:00 2001 From: Yevhenii Date: Tue, 29 Jul 2025 17:07:58 +0300 Subject: [PATCH 1/5] Edge Misordering Compensation Config - Moved misordering compensation to configuration for better control of edge event handling and data consistency --- .../server/service/edge/rpc/EdgeEventStorageSettings.java | 2 ++ .../server/service/edge/rpc/EdgeGrpcSession.java | 2 +- .../service/edge/rpc/fetch/GeneralEdgeEventFetcher.java | 8 +++----- application/src/main/resources/thingsboard.yml | 3 +++ 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventStorageSettings.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventStorageSettings.java index 89d19438bd..d9e3f1a920 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventStorageSettings.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventStorageSettings.java @@ -29,4 +29,6 @@ public class EdgeEventStorageSettings { private long noRecordsSleepInterval; @Value("${edges.storage.sleep_between_batches}") private long sleepIntervalBetweenBatches; + @Value("${edges.storage.misordering_compensation_millis:3600000}") + private long misorderingCompensationMillis; } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 521730741f..03732bed64 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -589,7 +589,7 @@ public abstract class EdgeGrpcSession implements Closeable { previousStartSeqId, false, Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()), - ctx.getEdgeEventService()); + ctx.getEdgeEventService(), ctx.getEdgeEventStorageSettings().getMisorderingCompensationMillis()); log.trace("[{}][{}] starting processing edge events, previousStartTs = {}, previousStartSeqId = {}", tenantId, edge.getId(), previousStartTs, previousStartSeqId); Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java index 5d7df601b5..e3fa970284 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java @@ -26,13 +26,9 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.dao.edge.EdgeEventService; -import java.util.concurrent.TimeUnit; - @AllArgsConstructor @Slf4j public class GeneralEdgeEventFetcher implements EdgeEventFetcher { - // Subtract from queueStartTs to ensure no data is lost due to potential misordering of edge events by created_time. - private static final long MISORDERING_COMPENSATION_MILLIS = TimeUnit.SECONDS.toMillis(60); private final Long queueStartTs; private Long seqIdStart; @@ -40,6 +36,8 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher { private boolean seqIdNewCycleStarted; private Long maxReadRecordsCount; private final EdgeEventService edgeEventService; + // Subtract from queueStartTs to ensure no data is lost due to potential misordering of edge events by created_time. + private final long misorderingCompensationMillis; @Override public PageLink getPageLink(int pageSize) { @@ -48,7 +46,7 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher { 0, null, null, - queueStartTs > 0 ? queueStartTs - MISORDERING_COMPENSATION_MILLIS : 0, + queueStartTs > 0 ? queueStartTs - misorderingCompensationMillis : 0, System.currentTimeMillis()); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 0c1d419da2..61ed93b03c 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1480,6 +1480,9 @@ edges: no_read_records_sleep: "${EDGES_NO_READ_RECORDS_SLEEP:1000}" # Number of milliseconds to wait before resending failed batch of edge events to edge sleep_between_batches: "${EDGES_SLEEP_BETWEEN_BATCHES:60000}" + # Number of milliseconds to subtract from queue start timestamp to compensate for potential + # misordering of edge events by created_time. Prevents skipping early events. + misordering_compensation_millis: "${EDGES_MISORDERING_COMPENSATION_MILLIS:3600000}" # Max number of high priority edge events per edge session. No persistence - stored in memory max_high_priority_queue_size_per_session: "${EDGES_MAX_HIGH_PRIORITY_QUEUE_SIZE_PER_SESSION:10000}" # Number of threads that are used to check DB for edge events From a101c9403ffb8f496e77ba5d616655153f3ca0d1 Mon Sep 17 00:00:00 2001 From: Yevhenii Date: Tue, 29 Jul 2025 17:44:15 +0300 Subject: [PATCH 2/5] Refactoring --- .../thingsboard/server/service/edge/rpc/EdgeGrpcSession.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 03732bed64..a86409c6af 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -589,7 +589,8 @@ public abstract class EdgeGrpcSession implements Closeable { previousStartSeqId, false, Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()), - ctx.getEdgeEventService(), ctx.getEdgeEventStorageSettings().getMisorderingCompensationMillis()); + ctx.getEdgeEventService(), + ctx.getEdgeEventStorageSettings().getMisorderingCompensationMillis()); log.trace("[{}][{}] starting processing edge events, previousStartTs = {}, previousStartSeqId = {}", tenantId, edge.getId(), previousStartTs, previousStartSeqId); Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() { From 2c7c63c537e5f4c0685fad5e699042ee511f7e68 Mon Sep 17 00:00:00 2001 From: Yevhenii Date: Tue, 29 Jul 2025 18:36:07 +0300 Subject: [PATCH 3/5] Refactoring --- .../service/edge/rpc/fetch/GeneralEdgeEventFetcher.java | 4 +++- application/src/main/resources/thingsboard.yml | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java index e3fa970284..b4f894c422 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java @@ -36,7 +36,9 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher { private boolean seqIdNewCycleStarted; private Long maxReadRecordsCount; private final EdgeEventService edgeEventService; - // Subtract from queueStartTs to ensure no data is lost due to potential misordering of edge events by created_time. + // Subtract from queueStartTs to compensate for possible misalignment between `created_time` and `seqId`. + // This ensures early events with lower seqId are not skipped due to partitioning by `created_time`. + // See: edge_event is partitioned by created_time but sorted by seqId during retrieval. private final long misorderingCompensationMillis; @Override diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 61ed93b03c..d36a890861 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1480,8 +1480,10 @@ edges: no_read_records_sleep: "${EDGES_NO_READ_RECORDS_SLEEP:1000}" # Number of milliseconds to wait before resending failed batch of edge events to edge sleep_between_batches: "${EDGES_SLEEP_BETWEEN_BATCHES:60000}" - # Number of milliseconds to subtract from queue start timestamp to compensate for potential - # misordering of edge events by created_time. Prevents skipping early events. + # Time (in milliseconds) to subtract from the start timestamp when fetching edge events. + # This compensates for possible misordering between `created_time` (used for partitioning) + # and `seqId` (used for sorting). Without this, events with smaller seqId but larger created_time + # might be skipped, especially across partition boundaries. misordering_compensation_millis: "${EDGES_MISORDERING_COMPENSATION_MILLIS:3600000}" # Max number of high priority edge events per edge session. No persistence - stored in memory max_high_priority_queue_size_per_session: "${EDGES_MAX_HIGH_PRIORITY_QUEUE_SIZE_PER_SESSION:10000}" From 2e4b917e09096337c0af3af5368606a9364bd81f Mon Sep 17 00:00:00 2001 From: Yevhenii Date: Thu, 31 Jul 2025 16:00:13 +0300 Subject: [PATCH 4/5] Refactoring --- application/src/main/resources/thingsboard.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index d36a890861..b515884842 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1484,7 +1484,7 @@ edges: # This compensates for possible misordering between `created_time` (used for partitioning) # and `seqId` (used for sorting). Without this, events with smaller seqId but larger created_time # might be skipped, especially across partition boundaries. - misordering_compensation_millis: "${EDGES_MISORDERING_COMPENSATION_MILLIS:3600000}" + misordering_compensation_millis: "${EDGES_MISORDERING_COMPENSATION_MILLIS:60000}" # Max number of high priority edge events per edge session. No persistence - stored in memory max_high_priority_queue_size_per_session: "${EDGES_MAX_HIGH_PRIORITY_QUEUE_SIZE_PER_SESSION:10000}" # Number of threads that are used to check DB for edge events From 51b610f679da1f67ea0b758366cbac427be388bb Mon Sep 17 00:00:00 2001 From: Yevhenii Date: Mon, 4 Aug 2025 15:25:22 +0300 Subject: [PATCH 5/5] Refactoring default value --- .../server/service/edge/rpc/EdgeEventStorageSettings.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventStorageSettings.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventStorageSettings.java index d9e3f1a920..618aee5e00 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventStorageSettings.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventStorageSettings.java @@ -29,6 +29,6 @@ public class EdgeEventStorageSettings { private long noRecordsSleepInterval; @Value("${edges.storage.sleep_between_batches}") private long sleepIntervalBetweenBatches; - @Value("${edges.storage.misordering_compensation_millis:3600000}") + @Value("${edges.storage.misordering_compensation_millis:60000}") private long misorderingCompensationMillis; }