Edge Misordering Compensation Config
- Moved misordering compensation to configuration for better control of edge event handling and data consistency
This commit is contained in:
		
							parent
							
								
									320e0b674e
								
							
						
					
					
						commit
						b191c2a2ec
					
				@ -29,4 +29,6 @@ public class EdgeEventStorageSettings {
 | 
				
			|||||||
    private long noRecordsSleepInterval;
 | 
					    private long noRecordsSleepInterval;
 | 
				
			||||||
    @Value("${edges.storage.sleep_between_batches}")
 | 
					    @Value("${edges.storage.sleep_between_batches}")
 | 
				
			||||||
    private long sleepIntervalBetweenBatches;
 | 
					    private long sleepIntervalBetweenBatches;
 | 
				
			||||||
 | 
					    @Value("${edges.storage.misordering_compensation_millis:3600000}")
 | 
				
			||||||
 | 
					    private long misorderingCompensationMillis;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -589,7 +589,7 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
				
			|||||||
                    previousStartSeqId,
 | 
					                    previousStartSeqId,
 | 
				
			||||||
                    false,
 | 
					                    false,
 | 
				
			||||||
                    Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()),
 | 
					                    Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()),
 | 
				
			||||||
                    ctx.getEdgeEventService());
 | 
					                    ctx.getEdgeEventService(), ctx.getEdgeEventStorageSettings().getMisorderingCompensationMillis());
 | 
				
			||||||
            log.trace("[{}][{}] starting processing edge events, previousStartTs = {}, previousStartSeqId = {}",
 | 
					            log.trace("[{}][{}] starting processing edge events, previousStartTs = {}, previousStartSeqId = {}",
 | 
				
			||||||
                    tenantId, edge.getId(), previousStartTs, previousStartSeqId);
 | 
					                    tenantId, edge.getId(), previousStartTs, previousStartSeqId);
 | 
				
			||||||
            Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() {
 | 
					            Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() {
 | 
				
			||||||
 | 
				
			|||||||
@ -26,13 +26,9 @@ import org.thingsboard.server.common.data.page.PageLink;
 | 
				
			|||||||
import org.thingsboard.server.common.data.page.TimePageLink;
 | 
					import org.thingsboard.server.common.data.page.TimePageLink;
 | 
				
			||||||
import org.thingsboard.server.dao.edge.EdgeEventService;
 | 
					import org.thingsboard.server.dao.edge.EdgeEventService;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import java.util.concurrent.TimeUnit;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@AllArgsConstructor
 | 
					@AllArgsConstructor
 | 
				
			||||||
@Slf4j
 | 
					@Slf4j
 | 
				
			||||||
public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
 | 
					public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
 | 
				
			||||||
    // Subtract from queueStartTs to ensure no data is lost due to potential misordering of edge events by created_time.
 | 
					 | 
				
			||||||
    private static final long MISORDERING_COMPENSATION_MILLIS = TimeUnit.SECONDS.toMillis(60);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private final Long queueStartTs;
 | 
					    private final Long queueStartTs;
 | 
				
			||||||
    private Long seqIdStart;
 | 
					    private Long seqIdStart;
 | 
				
			||||||
@ -40,6 +36,8 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
 | 
				
			|||||||
    private boolean seqIdNewCycleStarted;
 | 
					    private boolean seqIdNewCycleStarted;
 | 
				
			||||||
    private Long maxReadRecordsCount;
 | 
					    private Long maxReadRecordsCount;
 | 
				
			||||||
    private final EdgeEventService edgeEventService;
 | 
					    private final EdgeEventService edgeEventService;
 | 
				
			||||||
 | 
					    // Subtract from queueStartTs to ensure no data is lost due to potential misordering of edge events by created_time.
 | 
				
			||||||
 | 
					    private final long misorderingCompensationMillis;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public PageLink getPageLink(int pageSize) {
 | 
					    public PageLink getPageLink(int pageSize) {
 | 
				
			||||||
@ -48,7 +46,7 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
 | 
				
			|||||||
                0,
 | 
					                0,
 | 
				
			||||||
                null,
 | 
					                null,
 | 
				
			||||||
                null,
 | 
					                null,
 | 
				
			||||||
                queueStartTs > 0 ? queueStartTs - MISORDERING_COMPENSATION_MILLIS : 0,
 | 
					                queueStartTs > 0 ? queueStartTs - misorderingCompensationMillis : 0,
 | 
				
			||||||
                System.currentTimeMillis());
 | 
					                System.currentTimeMillis());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -1480,6 +1480,9 @@ edges:
 | 
				
			|||||||
    no_read_records_sleep: "${EDGES_NO_READ_RECORDS_SLEEP:1000}"
 | 
					    no_read_records_sleep: "${EDGES_NO_READ_RECORDS_SLEEP:1000}"
 | 
				
			||||||
    # Number of milliseconds to wait before resending failed batch of edge events to edge
 | 
					    # Number of milliseconds to wait before resending failed batch of edge events to edge
 | 
				
			||||||
    sleep_between_batches: "${EDGES_SLEEP_BETWEEN_BATCHES:60000}"
 | 
					    sleep_between_batches: "${EDGES_SLEEP_BETWEEN_BATCHES:60000}"
 | 
				
			||||||
 | 
					    # Number of milliseconds to subtract from queue start timestamp to compensate for potential
 | 
				
			||||||
 | 
					    # misordering of edge events by created_time. Prevents skipping early events.
 | 
				
			||||||
 | 
					    misordering_compensation_millis: "${EDGES_MISORDERING_COMPENSATION_MILLIS:3600000}"
 | 
				
			||||||
  # Max number of high priority edge events per edge session. No persistence - stored in memory
 | 
					  # Max number of high priority edge events per edge session. No persistence - stored in memory
 | 
				
			||||||
  max_high_priority_queue_size_per_session: "${EDGES_MAX_HIGH_PRIORITY_QUEUE_SIZE_PER_SESSION:10000}"
 | 
					  max_high_priority_queue_size_per_session: "${EDGES_MAX_HIGH_PRIORITY_QUEUE_SIZE_PER_SESSION:10000}"
 | 
				
			||||||
  # Number of threads that are used to check DB for edge events
 | 
					  # Number of threads that are used to check DB for edge events
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user