Merge pull request #3406 from ShvaykaD/improvements/queue-reprocessing-strategy-3.2
[3.2] Improvements/queue reprocessing strategy
This commit is contained in:
		
						commit
						827a1e3092
					
				@ -56,7 +56,9 @@ public class TbRuleEngineProcessingStrategyFactory {
 | 
			
		||||
        private final boolean retryTimeout;
 | 
			
		||||
        private final int maxRetries;
 | 
			
		||||
        private final double maxAllowedFailurePercentage;
 | 
			
		||||
        private final long pauseBetweenRetries;
 | 
			
		||||
        private final long maxPauseBetweenRetries;
 | 
			
		||||
 | 
			
		||||
        private long pauseBetweenRetries;
 | 
			
		||||
 | 
			
		||||
        private int initialTotalCount;
 | 
			
		||||
        private int retryCount;
 | 
			
		||||
@ -69,6 +71,7 @@ public class TbRuleEngineProcessingStrategyFactory {
 | 
			
		||||
            this.maxRetries = configuration.getRetries();
 | 
			
		||||
            this.maxAllowedFailurePercentage = configuration.getFailurePercentage();
 | 
			
		||||
            this.pauseBetweenRetries = configuration.getPauseBetweenRetries();
 | 
			
		||||
            this.maxPauseBetweenRetries = configuration.getMaxPauseBetweenRetries();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        @Override
 | 
			
		||||
@ -108,6 +111,9 @@ public class TbRuleEngineProcessingStrategyFactory {
 | 
			
		||||
                        } catch (InterruptedException e) {
 | 
			
		||||
                            throw new RuntimeException(e);
 | 
			
		||||
                        }
 | 
			
		||||
                        if (maxPauseBetweenRetries > pauseBetweenRetries) {
 | 
			
		||||
                            pauseBetweenRetries = Math.min(maxPauseBetweenRetries, pauseBetweenRetries * 2);
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    return new TbRuleEngineProcessingDecision(false, toReprocess);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
@ -758,6 +758,7 @@ queue:
 | 
			
		||||
          retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
 | 
			
		||||
          failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
 | 
			
		||||
          pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries;
 | 
			
		||||
          max-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:3}"# Max allowed time in seconds for pause between retries.
 | 
			
		||||
      - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}"
 | 
			
		||||
        topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}"
 | 
			
		||||
        poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}"
 | 
			
		||||
@ -773,6 +774,7 @@ queue:
 | 
			
		||||
          retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited
 | 
			
		||||
          failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
 | 
			
		||||
          pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
 | 
			
		||||
          max-pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}"# Max allowed time in seconds for pause between retries.
 | 
			
		||||
      - name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}"
 | 
			
		||||
        topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}"
 | 
			
		||||
        poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}"
 | 
			
		||||
@ -788,6 +790,7 @@ queue:
 | 
			
		||||
          retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
 | 
			
		||||
          failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
 | 
			
		||||
          pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
 | 
			
		||||
          max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}"# Max allowed time in seconds for pause between retries.
 | 
			
		||||
  transport:
 | 
			
		||||
    # For high priority notifications that require minimum latency and processing time
 | 
			
		||||
    notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
 | 
			
		||||
 | 
			
		||||
@ -24,5 +24,6 @@ public class TbRuleEngineQueueAckStrategyConfiguration {
 | 
			
		||||
    private int retries;
 | 
			
		||||
    private double failurePercentage;
 | 
			
		||||
    private long pauseBetweenRetries;
 | 
			
		||||
    private long maxPauseBetweenRetries;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user