InMemoryStorage performance improved. Many test cases added since it is essential piece of code.
This commit is contained in:
parent
b9b4d06376
commit
f8a6751182
@ -67,26 +67,22 @@ public final class DefaultInMemoryStorage implements InMemoryStorage {
|
||||
return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).add(msg);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <T extends TbQueueMsg> List<T> get(String topic) throws InterruptedException {
|
||||
if (storage.containsKey(topic)) {
|
||||
List<T> entities;
|
||||
@SuppressWarnings("unchecked")
|
||||
T first = (T) storage.get(topic).poll();
|
||||
if (first != null) {
|
||||
entities = new ArrayList<>();
|
||||
entities.add(first);
|
||||
List<TbQueueMsg> otherList = new ArrayList<>();
|
||||
storage.get(topic).drainTo(otherList, 999);
|
||||
for (TbQueueMsg other : otherList) {
|
||||
@SuppressWarnings("unchecked")
|
||||
T entity = (T) other;
|
||||
entities.add(entity);
|
||||
final BlockingQueue<TbQueueMsg> queue = storage.get(topic);
|
||||
if (queue != null) {
|
||||
final TbQueueMsg firstMsg = queue.poll();
|
||||
if (firstMsg != null) {
|
||||
final int queueSize = queue.size();
|
||||
if (queueSize > 0) {
|
||||
final List<TbQueueMsg> entities = new ArrayList<>(Math.min(queueSize, 999) + 1);
|
||||
entities.add(firstMsg);
|
||||
queue.drainTo(entities, 999);
|
||||
return (List<T>) entities;
|
||||
}
|
||||
} else {
|
||||
entities = Collections.emptyList();
|
||||
return Collections.singletonList((T) firstMsg);
|
||||
}
|
||||
return entities;
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@ -30,6 +30,9 @@ import static org.mockito.Mockito.mock;
|
||||
|
||||
@Slf4j
|
||||
public class DefaultInMemoryStorageTest {
|
||||
static final int MAX_POLL_SIZE = 1000;
|
||||
final Gson gson = new Gson();
|
||||
final String topic = "tb_core_notification.tb-node-0";
|
||||
|
||||
InMemoryStorage storage = new DefaultInMemoryStorage();
|
||||
|
||||
@ -47,19 +50,66 @@ public class DefaultInMemoryStorageTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenQueue_whenPoll_thenReturnList() throws InterruptedException {
|
||||
Gson gson = new Gson();
|
||||
String topic = "tb_core_notification.tb-node-0";
|
||||
List<TbQueueMsg> msgs = new ArrayList<>(1001);
|
||||
for (int i = 0; i < 1001; i++) {
|
||||
public void givenQueueWithMoreThenBatchSize_whenPoll_thenReturnFullListAndSecondList() throws InterruptedException {
|
||||
List<TbQueueMsg> msgs = new ArrayList<>(MAX_POLL_SIZE + 1);
|
||||
for (int i = 0; i < MAX_POLL_SIZE + 1; i++) {
|
||||
DefaultTbQueueMsg msg = gson.fromJson("{\"key\": \"" + UUID.randomUUID() + "\"}", DefaultTbQueueMsg.class);
|
||||
msgs.add(msg);
|
||||
storage.put(topic, msg);
|
||||
}
|
||||
|
||||
assertThat(storage.getLagTotal()).as("total lag is 1001").isEqualTo(1001);
|
||||
assertThat(storage.get(topic)).as("poll exactly 1000 msgs").isEqualTo(msgs.subList(0, 1000));
|
||||
assertThat(storage.get(topic)).as("poll last 1 message").isEqualTo(msgs.subList(1000, 1001));
|
||||
assertThat(storage.getLagTotal()).as("total lag is 1001").isEqualTo(MAX_POLL_SIZE + 1);
|
||||
assertThat(storage.get(topic)).as("poll exactly 1000 msgs").isEqualTo(msgs.subList(0, MAX_POLL_SIZE));
|
||||
assertThat(storage.get(topic)).as("poll last 1 message").isEqualTo(msgs.subList(MAX_POLL_SIZE, MAX_POLL_SIZE + 1));
|
||||
assertThat(storage.getLagTotal()).as("total lag is zero").isEqualTo(0);
|
||||
}
|
||||
|
||||
private void testPollOnce(final int msgCount) throws InterruptedException {
|
||||
List<TbQueueMsg> msgs = new ArrayList<>(msgCount);
|
||||
for (int i = 0; i < msgCount; i++) {
|
||||
DefaultTbQueueMsg msg = gson.fromJson("{\"key\": \"" + UUID.randomUUID() + "\"}", DefaultTbQueueMsg.class);
|
||||
msgs.add(msg);
|
||||
storage.put(topic, msg);
|
||||
}
|
||||
|
||||
assertThat(storage.getLagTotal()).as("total lag before poll").isEqualTo(msgCount);
|
||||
assertThat(storage.get(topic)).as("polled exactly msgs").isEqualTo(msgs.subList(0, msgCount));
|
||||
assertThat(storage.getLagTotal()).as("final lag is zero").isEqualTo(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenQueueWithExactBatchSize_whenPoll_thenReturnExactBatchSizeList() throws InterruptedException {
|
||||
testPollOnce(MAX_POLL_SIZE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenQueueWithExactBatchSizeMinusOne_whenPoll_thenReturnCorrectSizeList() throws InterruptedException {
|
||||
testPollOnce(MAX_POLL_SIZE - 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenQueueWithExactBatchSizeMinusTen_whenPoll_thenReturnCorrectSizeList() throws InterruptedException {
|
||||
testPollOnce(MAX_POLL_SIZE - 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenQueueEmpty_whenPoll_thenReturnEmptyList() throws InterruptedException {
|
||||
testPollOnce(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenQueueWithSingleMessage_whenPoll_thenReturnSingletonList() throws InterruptedException {
|
||||
testPollOnce(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenQueueWithTwoMessages_whenPoll_thenReturnCorrectSizeList() throws InterruptedException {
|
||||
testPollOnce(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenQueueWithTenMessages_whenPoll_thenReturnCorrectSizeList() throws InterruptedException {
|
||||
testPollOnce(10);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user