Improvements for queue-related tests
This commit is contained in:
parent
60900a0b96
commit
9ca568d070
@ -687,7 +687,7 @@ public class TenantControllerTest extends AbstractControllerTest {
|
|||||||
submittedMsgs.add(tbMsg.getId());
|
submittedMsgs.add(tbMsg.getId());
|
||||||
Thread.sleep(timeLeft / msgs);
|
Thread.sleep(timeLeft / msgs);
|
||||||
}
|
}
|
||||||
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
|
await().atMost(15, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
verify(queueAdmin, times(1)).deleteTopic(eq(isolatedTopic));
|
verify(queueAdmin, times(1)).deleteTopic(eq(isolatedTopic));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@ -456,8 +456,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
|
|||||||
clearInvocations(actorContext);
|
clearInvocations(actorContext);
|
||||||
verify(consumer1, never()).unsubscribe();
|
verify(consumer1, never()).unsubscribe();
|
||||||
verify(consumer2, never()).unsubscribe();
|
verify(consumer2, never()).unsubscribe();
|
||||||
int msgCount1 = consumer1.msgCount;
|
int msgCount = totalConsumedMsgs.get();
|
||||||
int msgCount2 = consumer2.msgCount;
|
|
||||||
|
|
||||||
await().atLeast(4, TimeUnit.SECONDS) // based on topicDeletionDelayInSec
|
await().atLeast(4, TimeUnit.SECONDS) // based on topicDeletionDelayInSec
|
||||||
.atMost(7, TimeUnit.SECONDS)
|
.atMost(7, TimeUnit.SECONDS)
|
||||||
@ -471,10 +470,8 @@ public class TbRuleEngineQueueConsumerManagerTest {
|
|||||||
verify(consumer1).unsubscribe();
|
verify(consumer1).unsubscribe();
|
||||||
verify(consumer2).unsubscribe();
|
verify(consumer2).unsubscribe();
|
||||||
|
|
||||||
int movedMsgs1 = consumer1.msgCount - msgCount1;
|
int totalMovedMsgs = totalConsumedMsgs.get() - msgCount;
|
||||||
int movedMsgs2 = consumer2.msgCount - msgCount2;
|
assertThat(totalMovedMsgs).isNotZero();
|
||||||
int totalMovedMsgs = movedMsgs1 + movedMsgs2;
|
|
||||||
assertThat(totalMovedMsgs).isGreaterThan(10);
|
|
||||||
verify(ruleEngineMsgProducer, atLeast(totalMovedMsgs)).send(any(), any(), any());
|
verify(ruleEngineMsgProducer, atLeast(totalMovedMsgs)).send(any(), any(), any());
|
||||||
verify(actorContext, never()).tell(any());
|
verify(actorContext, never()).tell(any());
|
||||||
generateQueueMsgs = false;
|
generateQueueMsgs = false;
|
||||||
@ -499,7 +496,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
|
|||||||
});
|
});
|
||||||
clearInvocations(actorContext);
|
clearInvocations(actorContext);
|
||||||
verify(consumer, never()).unsubscribe();
|
verify(consumer, never()).unsubscribe();
|
||||||
int msgCount = consumer.msgCount;
|
int msgCount = totalConsumedMsgs.get();
|
||||||
|
|
||||||
await().atLeast(4, TimeUnit.SECONDS)
|
await().atLeast(4, TimeUnit.SECONDS)
|
||||||
.atMost(7, TimeUnit.SECONDS)
|
.atMost(7, TimeUnit.SECONDS)
|
||||||
@ -512,7 +509,8 @@ public class TbRuleEngineQueueConsumerManagerTest {
|
|||||||
});
|
});
|
||||||
verify(consumer).unsubscribe();
|
verify(consumer).unsubscribe();
|
||||||
|
|
||||||
int movedMsgs = consumer.msgCount - msgCount;
|
int movedMsgs = totalConsumedMsgs.get() - msgCount;
|
||||||
|
assertThat(movedMsgs).isNotZero();
|
||||||
verify(ruleEngineMsgProducer, atLeast(movedMsgs)).send(any(), any(), any());
|
verify(ruleEngineMsgProducer, atLeast(movedMsgs)).send(any(), any(), any());
|
||||||
verify(actorContext, never()).tell(any());
|
verify(actorContext, never()).tell(any());
|
||||||
generateQueueMsgs = false;
|
generateQueueMsgs = false;
|
||||||
@ -607,11 +605,6 @@ public class TbRuleEngineQueueConsumerManagerTest {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* 2023-10-15 18:34:06,090 [main] INFO o.t.s.s.q.r.TbRuleEngineQueueConsumerManagerTest - Generated new partitions: [0, 1, 2, 3, 4, 5, 6, 8, 9, 11, 12, 13, 15, 16, 17, 18, 19]
|
|
||||||
2023-10-15 18:34:06,090 [main] INFO o.t.s.s.q.r.TbRuleEngineQueueConsumerManagerTest - Generated new config: consumerPerPartition=false, pollInterval=299, processingStrategy=RETRY_FAILED
|
|
||||||
* */
|
|
||||||
|
|
||||||
private void verifySubscribedAndLaunched(TestConsumer consumer, Set<TopicPartitionInfo> expectedPartitions) {
|
private void verifySubscribedAndLaunched(TestConsumer consumer, Set<TopicPartitionInfo> expectedPartitions) {
|
||||||
await().atMost(2, TimeUnit.SECONDS)
|
await().atMost(2, TimeUnit.SECONDS)
|
||||||
.until(() -> consumer.subscribed && consumer.getPartitions().equals(expectedPartitions) && consumer.pollingStarted);
|
.until(() -> consumer.subscribed && consumer.getPartitions().equals(expectedPartitions) && consumer.pollingStarted);
|
||||||
@ -701,7 +694,6 @@ public class TbRuleEngineQueueConsumerManagerTest {
|
|||||||
private boolean pollingStarted;
|
private boolean pollingStarted;
|
||||||
|
|
||||||
private TbMsg testMsg;
|
private TbMsg testMsg;
|
||||||
private int msgCount;
|
|
||||||
|
|
||||||
public TestConsumer(String topic) {
|
public TestConsumer(String topic) {
|
||||||
super(topic);
|
super(topic);
|
||||||
@ -746,7 +738,6 @@ public class TbRuleEngineQueueConsumerManagerTest {
|
|||||||
if (!subscribed) {
|
if (!subscribed) {
|
||||||
throw new IllegalStateException("Cannot commit because not subscribed");
|
throw new IllegalStateException("Cannot commit because not subscribed");
|
||||||
}
|
}
|
||||||
msgCount++;
|
|
||||||
log.debug("doCommit() totalConsumedMsgs = {}", totalConsumedMsgs.incrementAndGet());
|
log.debug("doCommit() totalConsumedMsgs = {}", totalConsumedMsgs.incrementAndGet());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user