diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java index f7a7c6e4dd..ce5fba102a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/deduplication/TbMsgDeduplicationNode.java @@ -64,7 +64,7 @@ import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME; @Slf4j public class TbMsgDeduplicationNode implements TbNode { - public static final int TB_MSG_DEDUPLICATION_RETRY_DELAY = 10; + public static final long TB_MSG_DEDUPLICATION_RETRY_DELAY = 10L; private TbMsgDeduplicationNodeConfiguration config; @@ -217,16 +217,17 @@ public class TbMsgDeduplicationNode implements TbNode { } private void enqueueForTellNextWithRetry(TbContext ctx, TbMsg msg, int retryAttempt) { - if (config.getMaxRetries() > retryAttempt) { + if (retryAttempt <= config.getMaxRetries()) { ctx.enqueueForTellNext(msg, TbNodeConnectionType.SUCCESS, - () -> { - log.trace("[{}][{}][{}] Successfully enqueue deduplication result message!", ctx.getSelfId(), msg.getOriginator(), retryAttempt); - }, + () -> log.trace("[{}][{}][{}] Successfully enqueue deduplication result message!", ctx.getSelfId(), msg.getOriginator(), retryAttempt), throwable -> { log.trace("[{}][{}][{}] Failed to enqueue deduplication output message due to: ", ctx.getSelfId(), msg.getOriginator(), retryAttempt, throwable); - ctx.schedule(() -> { - enqueueForTellNextWithRetry(ctx, msg, retryAttempt + 1); - }, TB_MSG_DEDUPLICATION_RETRY_DELAY, TimeUnit.SECONDS); + if (retryAttempt < config.getMaxRetries()) { + ctx.schedule(() -> enqueueForTellNextWithRetry(ctx, msg, retryAttempt + 1), TB_MSG_DEDUPLICATION_RETRY_DELAY, TimeUnit.SECONDS); + } else { + log.trace("[{}][{}] Max retries [{}] exhausted. Dropping deduplication result message [{}]", + ctx.getSelfId(), msg.getOriginator(), config.getMaxRetries(), msg.getId()); + } }); } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java index 4b9cc0aadd..1a0c3431d7 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java @@ -62,11 +62,13 @@ import java.util.function.Consumer; import java.util.stream.Stream; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -411,6 +413,80 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { Assertions.assertEquals(msgWithLatestTsInSecondPack.getType(), actualMsg.getType()); } + @Test + public void given_maxRetriesIsZero_when_enqueueFails_then_noRetriesIsScheduled() throws TbNodeException, ExecutionException, InterruptedException { + int wantedNumberOfTellSelfInvocation = 1; + int msgCount = 1; + awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); + invokeTellSelf(wantedNumberOfTellSelfInvocation); + + // Given + when(ctx.getQueueName()).thenReturn(DataConstants.MAIN_QUEUE_NAME); + config.setInterval(deduplicationInterval); + config.setStrategy(DeduplicationStrategy.FIRST); + config.setMaxPendingMsgs(msgCount); + config.setMaxRetries(0); + nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctx, nodeConfiguration); + + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + long currentTimeMillis = System.currentTimeMillis(); + + doAnswer(invocation -> { + Consumer failureCallback = invocation.getArgument(3); + failureCallback.accept(new RuntimeException("Simulated failure")); + return null; + }).when(ctx).enqueueForTellNext(any(), eq(TbNodeConnectionType.SUCCESS), any(), any()); + + TbMsg msg = createMsg(deviceId, currentTimeMillis + 1); + node.onMsg(ctx, msg); + + awaitTellSelfLatch.await(); + + verify(ctx).enqueueForTellNext(any(), eq(TbNodeConnectionType.SUCCESS), any(), any()); + verify(ctx, never()).schedule(any(), anyLong(), any()); + } + + @Test + public void given_maxRetriesIsSetToOne_when_enqueueFails_then_onlyOneRetryIsScheduled() throws TbNodeException, ExecutionException, InterruptedException { + int wantedNumberOfTellSelfInvocation = 1; + int msgCount = 1; + awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation); + invokeTellSelf(wantedNumberOfTellSelfInvocation); + + when(ctx.getQueueName()).thenReturn(DataConstants.MAIN_QUEUE_NAME); + config.setInterval(deduplicationInterval); + config.setStrategy(DeduplicationStrategy.FIRST); + config.setMaxPendingMsgs(msgCount); + config.setMaxRetries(1); + nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctx, nodeConfiguration); + + DeviceId deviceId = new DeviceId(UUID.randomUUID()); + long currentTimeMillis = System.currentTimeMillis(); + + doAnswer(invocation -> { + Consumer failureCallback = invocation.getArgument(3); + failureCallback.accept(new RuntimeException("Simulated failure")); + return null; + }).when(ctx).enqueueForTellNext(any(), eq(TbNodeConnectionType.SUCCESS), any(), any()); + + TbMsg msg = createMsg(deviceId, currentTimeMillis + 1); + node.onMsg(ctx, msg); + + awaitTellSelfLatch.await(); + + ArgumentCaptor retryRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(ctx).schedule(retryRunnableCaptor.capture(), eq(TbMsgDeduplicationNode.TB_MSG_DEDUPLICATION_RETRY_DELAY), eq(TimeUnit.SECONDS)); + + retryRunnableCaptor.getValue().run(); + + // Verify total enqueue attempts (initial + retry) + verify(ctx, times(2)).enqueueForTellNext(any(), eq(TbNodeConnectionType.SUCCESS), any(), any()); + // No more retries scheduled after reaching maxRetries + verify(ctx).schedule(any(), eq(TbMsgDeduplicationNode.TB_MSG_DEDUPLICATION_RETRY_DELAY), eq(TimeUnit.SECONDS)); + } + // Rule nodes upgrade private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { return Stream.of(