diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java index b6220f5f94..b42615ad8f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java @@ -56,7 +56,9 @@ public class TbRuleEngineProcessingStrategyFactory { private final boolean retryTimeout; private final int maxRetries; private final double maxAllowedFailurePercentage; - private final long pauseBetweenRetries; + private final long maxPauseBetweenRetries; + + private long pauseBetweenRetries; private int initialTotalCount; private int retryCount; @@ -69,6 +71,7 @@ public class TbRuleEngineProcessingStrategyFactory { this.maxRetries = configuration.getRetries(); this.maxAllowedFailurePercentage = configuration.getFailurePercentage(); this.pauseBetweenRetries = configuration.getPauseBetweenRetries(); + this.maxPauseBetweenRetries = configuration.getMaxPauseBetweenRetries(); } @Override @@ -108,6 +111,9 @@ public class TbRuleEngineProcessingStrategyFactory { } catch (InterruptedException e) { throw new RuntimeException(e); } + if (maxPauseBetweenRetries > pauseBetweenRetries) { + pauseBetweenRetries = Math.min(maxPauseBetweenRetries, pauseBetweenRetries * 2); + } } return new TbRuleEngineProcessingDecision(false, toReprocess); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 92893a9c5f..c363207e49 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -758,6 +758,7 @@ queue: retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries; + max-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:3}"# Max allowed time in seconds for pause between retries. - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}" topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}" poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}" @@ -773,6 +774,7 @@ queue: retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; + max-pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}"# Max allowed time in seconds for pause between retries. - name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}" topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}" poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}" @@ -788,6 +790,7 @@ queue: retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; + max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}"# Max allowed time in seconds for pause between retries. transport: # For high priority notifications that require minimum latency and processing time notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java index 0d21c59c9c..978794662a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java @@ -24,5 +24,6 @@ public class TbRuleEngineQueueAckStrategyConfiguration { private int retries; private double failurePercentage; private long pauseBetweenRetries; + private long maxPauseBetweenRetries; }