deduplication node: fixed retry mechanism

This commit is contained in:
dshvaika 2025-05-20 12:27:26 +03:00
parent 2a79a12279
commit 440087384d
2 changed files with 85 additions and 8 deletions

View File

@ -64,7 +64,7 @@ import static org.thingsboard.server.common.data.DataConstants.QUEUE_NAME;
@Slf4j @Slf4j
public class TbMsgDeduplicationNode implements TbNode { public class TbMsgDeduplicationNode implements TbNode {
public static final int TB_MSG_DEDUPLICATION_RETRY_DELAY = 10; public static final long TB_MSG_DEDUPLICATION_RETRY_DELAY = 10L;
private TbMsgDeduplicationNodeConfiguration config; private TbMsgDeduplicationNodeConfiguration config;
@ -217,16 +217,17 @@ public class TbMsgDeduplicationNode implements TbNode {
} }
private void enqueueForTellNextWithRetry(TbContext ctx, TbMsg msg, int retryAttempt) { private void enqueueForTellNextWithRetry(TbContext ctx, TbMsg msg, int retryAttempt) {
if (config.getMaxRetries() > retryAttempt) { if (retryAttempt <= config.getMaxRetries()) {
ctx.enqueueForTellNext(msg, TbNodeConnectionType.SUCCESS, ctx.enqueueForTellNext(msg, TbNodeConnectionType.SUCCESS,
() -> { () -> log.trace("[{}][{}][{}] Successfully enqueue deduplication result message!", ctx.getSelfId(), msg.getOriginator(), retryAttempt),
log.trace("[{}][{}][{}] Successfully enqueue deduplication result message!", ctx.getSelfId(), msg.getOriginator(), retryAttempt);
},
throwable -> { throwable -> {
log.trace("[{}][{}][{}] Failed to enqueue deduplication output message due to: ", ctx.getSelfId(), msg.getOriginator(), retryAttempt, throwable); log.trace("[{}][{}][{}] Failed to enqueue deduplication output message due to: ", ctx.getSelfId(), msg.getOriginator(), retryAttempt, throwable);
ctx.schedule(() -> { if (retryAttempt < config.getMaxRetries()) {
enqueueForTellNextWithRetry(ctx, msg, retryAttempt + 1); ctx.schedule(() -> enqueueForTellNextWithRetry(ctx, msg, retryAttempt + 1), TB_MSG_DEDUPLICATION_RETRY_DELAY, TimeUnit.SECONDS);
}, TB_MSG_DEDUPLICATION_RETRY_DELAY, TimeUnit.SECONDS); } else {
log.trace("[{}][{}] Max retries [{}] exhausted. Dropping deduplication result message [{}]",
ctx.getSelfId(), msg.getOriginator(), config.getMaxRetries(), msg.getId());
}
}); });
} }
} }

View File

@ -62,11 +62,13 @@ import java.util.function.Consumer;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -411,6 +413,80 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest {
Assertions.assertEquals(msgWithLatestTsInSecondPack.getType(), actualMsg.getType()); Assertions.assertEquals(msgWithLatestTsInSecondPack.getType(), actualMsg.getType());
} }
@Test
public void given_maxRetriesIsZero_when_enqueueFails_then_noRetriesIsScheduled() throws TbNodeException, ExecutionException, InterruptedException {
int wantedNumberOfTellSelfInvocation = 1;
int msgCount = 1;
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
invokeTellSelf(wantedNumberOfTellSelfInvocation);
// Given
when(ctx.getQueueName()).thenReturn(DataConstants.MAIN_QUEUE_NAME);
config.setInterval(deduplicationInterval);
config.setStrategy(DeduplicationStrategy.FIRST);
config.setMaxPendingMsgs(msgCount);
config.setMaxRetries(0);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctx, nodeConfiguration);
DeviceId deviceId = new DeviceId(UUID.randomUUID());
long currentTimeMillis = System.currentTimeMillis();
doAnswer(invocation -> {
Consumer<Throwable> failureCallback = invocation.getArgument(3);
failureCallback.accept(new RuntimeException("Simulated failure"));
return null;
}).when(ctx).enqueueForTellNext(any(), eq(TbNodeConnectionType.SUCCESS), any(), any());
TbMsg msg = createMsg(deviceId, currentTimeMillis + 1);
node.onMsg(ctx, msg);
awaitTellSelfLatch.await();
verify(ctx).enqueueForTellNext(any(), eq(TbNodeConnectionType.SUCCESS), any(), any());
verify(ctx, never()).schedule(any(), anyLong(), any());
}
@Test
public void given_maxRetriesIsSetToOne_when_enqueueFails_then_onlyOneRetryIsScheduled() throws TbNodeException, ExecutionException, InterruptedException {
int wantedNumberOfTellSelfInvocation = 1;
int msgCount = 1;
awaitTellSelfLatch = new CountDownLatch(wantedNumberOfTellSelfInvocation);
invokeTellSelf(wantedNumberOfTellSelfInvocation);
when(ctx.getQueueName()).thenReturn(DataConstants.MAIN_QUEUE_NAME);
config.setInterval(deduplicationInterval);
config.setStrategy(DeduplicationStrategy.FIRST);
config.setMaxPendingMsgs(msgCount);
config.setMaxRetries(1);
nodeConfiguration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctx, nodeConfiguration);
DeviceId deviceId = new DeviceId(UUID.randomUUID());
long currentTimeMillis = System.currentTimeMillis();
doAnswer(invocation -> {
Consumer<Throwable> failureCallback = invocation.getArgument(3);
failureCallback.accept(new RuntimeException("Simulated failure"));
return null;
}).when(ctx).enqueueForTellNext(any(), eq(TbNodeConnectionType.SUCCESS), any(), any());
TbMsg msg = createMsg(deviceId, currentTimeMillis + 1);
node.onMsg(ctx, msg);
awaitTellSelfLatch.await();
ArgumentCaptor<Runnable> retryRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(ctx).schedule(retryRunnableCaptor.capture(), eq(TbMsgDeduplicationNode.TB_MSG_DEDUPLICATION_RETRY_DELAY), eq(TimeUnit.SECONDS));
retryRunnableCaptor.getValue().run();
// Verify total enqueue attempts (initial + retry)
verify(ctx, times(2)).enqueueForTellNext(any(), eq(TbNodeConnectionType.SUCCESS), any(), any());
// No more retries scheduled after reaching maxRetries
verify(ctx).schedule(any(), eq(TbMsgDeduplicationNode.TB_MSG_DEDUPLICATION_RETRY_DELAY), eq(TimeUnit.SECONDS));
}
// Rule nodes upgrade // Rule nodes upgrade
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of( return Stream.of(