diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java index a14addaadc..e63b0cef8e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java @@ -27,7 +27,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; @Component @Slf4j @@ -57,13 +56,11 @@ public class TbRuleEngineProcessingStrategyFactory { private final boolean retryTimeout; private final int maxRetries; private final double maxAllowedFailurePercentage; - private final long pauseBetweenRetries; - private final boolean expPauseBetweenRetries; + private final boolean multiplyPauseBetweenRetries; + private final long maxPauseBetweenRetries; - private long maxExpPauseBetweenRetries; - private double maxExpDegreeValue; - private AtomicInteger expDegreeStep; + private long pauseBetweenRetries; private int initialTotalCount; private int retryCount; @@ -76,12 +73,8 @@ public class TbRuleEngineProcessingStrategyFactory { this.maxRetries = configuration.getRetries(); this.maxAllowedFailurePercentage = configuration.getFailurePercentage(); this.pauseBetweenRetries = configuration.getPauseBetweenRetries(); - this.expPauseBetweenRetries = configuration.isExpPauseBetweenRetries(); - if (this.expPauseBetweenRetries) { - this.expDegreeStep = new AtomicInteger(1); - this.maxExpPauseBetweenRetries = configuration.getMaxExpPauseBetweenRetries(); - this.maxExpDegreeValue = Math.log(maxExpPauseBetweenRetries) / Math.log(pauseBetweenRetries); - } + this.multiplyPauseBetweenRetries = configuration.isMultiplyPauseBetweenRetries(); + this.maxPauseBetweenRetries = configuration.getMaxPauseBetweenRetries(); } @Override @@ -116,24 +109,14 @@ public class TbRuleEngineProcessingStrategyFactory { toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); } if (pauseBetweenRetries > 0) { - if (expPauseBetweenRetries) { - long pause; - if (maxExpDegreeValue > expDegreeStep.get()) { - pause = new Double(Math.pow(pauseBetweenRetries, expDegreeStep.getAndIncrement())).longValue(); - } else { - pause = maxExpPauseBetweenRetries; - } - try { - Thread.sleep(TimeUnit.SECONDS.toMillis( - pause)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } else { - try { - Thread.sleep(TimeUnit.SECONDS.toMillis(pauseBetweenRetries)); - } catch (InterruptedException e) { - throw new RuntimeException(e); + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(pauseBetweenRetries)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if (multiplyPauseBetweenRetries && maxPauseBetweenRetries > 0) { + if (pauseBetweenRetries != maxPauseBetweenRetries) { + pauseBetweenRetries = Math.min(maxPauseBetweenRetries, pauseBetweenRetries * pauseBetweenRetries); } } } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index de36e549f9..c6f45dfdee 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -725,8 +725,8 @@ queue: retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries; - exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_EXP_RETRY_PAUSE:false}"# Parameter to enable/disable exponential increase of pause between retries; - max-exp-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_EXP_RETRY_PAUSE:25}"# Max allowed time in seconds for pause between retries. + multiply-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MUL_RETRY_PAUSE:false}"# Parameter to enable/disable multiplication of pause value between retries on each iteration; + max-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_MUL_RETRY_PAUSE:25}"# Max allowed time in seconds for pause between retries. - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}" topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}" poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}" @@ -742,8 +742,8 @@ queue: retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; - exp-pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_EXP_RETRY_PAUSE:false}"# Parameter to enable/disable exponential increase of pause between retries; - max-exp-pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_MAX_EXP_RETRY_PAUSE:120}"# Max allowed time in seconds for pause between retries. + multiply-pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_MUL_RETRY_PAUSE:false}"# Parameter to enable/disable multiplication of pause value between retries on each iteration; + max-pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_MAX_MUL_RETRY_PAUSE:120}"# Max allowed time in seconds for pause between retries. - name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}" topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}" poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}" @@ -759,8 +759,8 @@ queue: retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; - exp-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_EXP_RETRY_PAUSE:false}"# Parameter to enable/disable exponential increase of pause between retries; - max-exp-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_EXP_RETRY_PAUSE:120}"# Max allowed time in seconds for pause between retries. + multiply-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MUL_RETRY_PAUSE:false}"# Parameter to enable/disable multiplication of pause value between retries on each iteration; + max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_MUL_RETRY_PAUSE:120}"# Max allowed time in seconds for pause between retries. transport: # For high priority notifications that require minimum latency and processing time notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java index 2e61fe8b93..9427228047 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueAckStrategyConfiguration.java @@ -24,7 +24,7 @@ public class TbRuleEngineQueueAckStrategyConfiguration { private int retries; private double failurePercentage; private long pauseBetweenRetries; - private boolean expPauseBetweenRetries; - private long maxExpPauseBetweenRetries; + private boolean multiplyPauseBetweenRetries; + private long maxPauseBetweenRetries; }