diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java index b6220f5f94..ecec278b10 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java @@ -27,6 +27,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; @Component @Slf4j @@ -58,6 +59,12 @@ public class TbRuleEngineProcessingStrategyFactory { private final double maxAllowedFailurePercentage; private final long pauseBetweenRetries; + private final boolean expPauseBetweenRetries; + + private long maxExpPauseBetweenRetries; + private int maxExpDegreeValue; + private AtomicInteger expDegreeStep; + private int initialTotalCount; private int retryCount; @@ -69,6 +76,12 @@ public class TbRuleEngineProcessingStrategyFactory { this.maxRetries = configuration.getRetries(); this.maxAllowedFailurePercentage = configuration.getFailurePercentage(); this.pauseBetweenRetries = configuration.getPauseBetweenRetries(); + this.expPauseBetweenRetries = configuration.isExpPauseBetweenRetries(); + if (this.expPauseBetweenRetries) { + this.expDegreeStep = new AtomicInteger(1); + this.maxExpPauseBetweenRetries = configuration.getMaxExpPauseBetweenRetries(); + this.maxExpDegreeValue = new Double(Math.log(maxExpPauseBetweenRetries) / Math.log(pauseBetweenRetries)).intValue(); + } } @Override @@ -103,10 +116,25 @@ public class TbRuleEngineProcessingStrategyFactory { toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); } if (pauseBetweenRetries > 0) { - try { - Thread.sleep(TimeUnit.SECONDS.toMillis(pauseBetweenRetries)); - } catch (InterruptedException e) { - throw new RuntimeException(e); + if (expPauseBetweenRetries) { + long pause; + if (maxExpDegreeValue > expDegreeStep.get()) { + pause = new Double(Math.pow(pauseBetweenRetries, expDegreeStep.getAndIncrement())).longValue(); + } else { + pause = maxExpPauseBetweenRetries; + } + try { + Thread.sleep(TimeUnit.SECONDS.toMillis( + pause)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } else { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(pauseBetweenRetries)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } return new TbRuleEngineProcessingDecision(false, toReprocess); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index b6894bb0e7..6818d3069a 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -725,6 +725,8 @@ queue: retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries; + exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_EXP_RETRY_PAUSE:false}"# Parameter to enable/disable exponential increase of pause between retries; + max-exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_EXP_RETRY_PAUSE:86400}"# Max allowed time in seconds for pause between retries. - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}" topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}" poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}" @@ -740,6 +742,8 @@ queue: retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; + exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_EXP_RETRY_PAUSE:false}"# Parameter to enable/disable exponential increase of pause between retries; + max-exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_EXP_RETRY_PAUSE:86400}"# Max allowed time in seconds for pause between retries. - name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}" topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}" poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}" @@ -755,6 +759,8 @@ queue: retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; + exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_EXP_RETRY_PAUSE:false}"# Parameter to enable/disable exponential increase of pause between retries; + max-exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_EXP_RETRY_PAUSE:86400}"# Max allowed time in seconds for pause between retries. transport: # For high priority notifications that require minimum latency and processing time notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java index 0d21c59c9c..2e61fe8b93 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java @@ -24,5 +24,7 @@ public class TbRuleEngineQueueAckStrategyConfiguration { private int retries; private double failurePercentage; private long pauseBetweenRetries; + private boolean expPauseBetweenRetries; + private long maxExpPauseBetweenRetries; }