fixed flaky testOneActorCreated
This commit is contained in:
parent
20d074e87f
commit
3bd268eaa4
@ -16,12 +16,11 @@
|
|||||||
package org.thingsboard.server.actors;
|
package org.thingsboard.server.actors;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.awaitility.Awaitility;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
|
||||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
@ -37,13 +36,14 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ExtendWith(MockitoExtension.class)
|
|
||||||
public class ActorSystemTest {
|
public class ActorSystemTest {
|
||||||
|
|
||||||
public static final String ROOT_DISPATCHER = "root-dispatcher";
|
public static final String ROOT_DISPATCHER = "root-dispatcher";
|
||||||
private static final int _100K = 100 * 1024;
|
private static final int _100K = 100 * 1024;
|
||||||
public static final int TIMEOUT_AWAIT_MAX_SEC = 100;
|
public static final int TIMEOUT_AWAIT_MAX_SEC = 30;
|
||||||
|
|
||||||
private volatile TbActorSystem actorSystem;
|
private volatile TbActorSystem actorSystem;
|
||||||
private volatile ExecutorService submitPool;
|
private volatile ExecutorService submitPool;
|
||||||
@ -54,6 +54,7 @@ public class ActorSystemTest {
|
|||||||
public void initActorSystem() {
|
public void initActorSystem() {
|
||||||
int cores = Runtime.getRuntime().availableProcessors();
|
int cores = Runtime.getRuntime().availableProcessors();
|
||||||
parallelism = Math.max(2, cores / 2);
|
parallelism = Math.max(2, cores / 2);
|
||||||
|
log.debug("parallelism {}", parallelism);
|
||||||
TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42);
|
TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42);
|
||||||
actorSystem = new DefaultTbActorSystem(settings);
|
actorSystem = new DefaultTbActorSystem(settings);
|
||||||
submitPool = Executors.newFixedThreadPool(parallelism, ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-submit-test-scope")); //order guaranteed
|
submitPool = Executors.newFixedThreadPool(parallelism, ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-submit-test-scope")); //order guaranteed
|
||||||
@ -136,25 +137,39 @@ public class ActorSystemTest {
|
|||||||
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
|
actorSystem.createDispatcher(ROOT_DISPATCHER, executor);
|
||||||
ActorTestCtx testCtx1 = getActorTestCtx(1);
|
ActorTestCtx testCtx1 = getActorTestCtx(1);
|
||||||
ActorTestCtx testCtx2 = getActorTestCtx(1);
|
ActorTestCtx testCtx2 = getActorTestCtx(1);
|
||||||
|
assertThat(testCtx1.getLatch().getCount()).as("testCtx1 latch initial state").isEqualTo(1);
|
||||||
|
assertThat(testCtx2.getLatch().getCount()).as("testCtx2 latch initial state").isEqualTo(1);
|
||||||
TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID()));
|
TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID()));
|
||||||
final CountDownLatch initLatch = new CountDownLatch(1);
|
final CountDownLatch initLatch = new CountDownLatch(1);
|
||||||
final CountDownLatch actorsReadyLatch = new CountDownLatch(2);
|
final CountDownLatch actorsReadyLatch = new CountDownLatch(2);
|
||||||
submitPool.submit(() -> {
|
submitPool.submit(() -> {
|
||||||
|
log.info("submit 1");
|
||||||
actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx1, initLatch));
|
actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx1, initLatch));
|
||||||
actorsReadyLatch.countDown();
|
actorsReadyLatch.countDown();
|
||||||
|
log.info("done 1");
|
||||||
});
|
});
|
||||||
submitPool.submit(() -> {
|
submitPool.submit(() -> {
|
||||||
|
log.info("submit 2");
|
||||||
actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx2, initLatch));
|
actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx2, initLatch));
|
||||||
actorsReadyLatch.countDown();
|
actorsReadyLatch.countDown();
|
||||||
|
log.info("done 2");
|
||||||
});
|
});
|
||||||
|
|
||||||
initLatch.countDown(); //replacement for Thread.wait(500) in the SlowCreateActorCreator
|
initLatch.countDown(); //replacement for Thread.wait(500) in the SlowCreateActorCreator
|
||||||
Assertions.assertTrue(actorsReadyLatch.await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS));
|
Assertions.assertTrue(actorsReadyLatch.await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS));
|
||||||
|
log.info("actorsReadyLatch ok");
|
||||||
actorSystem.tell(actorId, new IntTbActorMsg(42));
|
actorSystem.tell(actorId, new IntTbActorMsg(42));
|
||||||
|
|
||||||
Assertions.assertTrue(testCtx1.getLatch().await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS));
|
//only one of two contexts are initialized. no matter Ctx1 or Ctx2
|
||||||
Assertions.assertFalse(testCtx2.getLatch().await(1, TimeUnit.SECONDS));
|
Awaitility.await("one of two actors latch zeroed").atMost(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS)
|
||||||
|
.until(() -> testCtx1.getLatch().getCount() + testCtx2.getLatch().getCount() == 1);
|
||||||
|
Thread.yield();
|
||||||
|
if (testCtx1.getLatch().getCount() == 0) {
|
||||||
|
assertThat(testCtx2.getLatch().await(100, TimeUnit.MILLISECONDS)).as("testCtx2 never latched").isFalse();
|
||||||
|
} else {
|
||||||
|
assertThat(testCtx1.getLatch().await(100, TimeUnit.MILLISECONDS)).as("testCtx1 never latched").isFalse();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -233,7 +248,7 @@ public class ActorSystemTest {
|
|||||||
log.info("Submitted all messages");
|
log.info("Submitted all messages");
|
||||||
testCtxes.forEach(ctx -> {
|
testCtxes.forEach(ctx -> {
|
||||||
try {
|
try {
|
||||||
boolean success = ctx.getLatch().await(1, TimeUnit.MINUTES);
|
boolean success = ctx.getLatch().await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS);
|
||||||
if (!success) {
|
if (!success) {
|
||||||
log.warn("Failed: {}, {}", ctx.getActual().get(), ctx.getInvocationCount().get());
|
log.warn("Failed: {}, {}", ctx.getActual().get(), ctx.getInvocationCount().get());
|
||||||
}
|
}
|
||||||
@ -242,7 +257,7 @@ public class ActorSystemTest {
|
|||||||
Assertions.assertEquals(msgNumber, ctx.getInvocationCount().get());
|
Assertions.assertEquals(msgNumber, ctx.getInvocationCount().get());
|
||||||
ctx.clear();
|
ctx.clear();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
log.error("interrupted", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
long duration = System.nanoTime() - start;
|
long duration = System.nanoTime() - start;
|
||||||
|
|||||||
@ -28,11 +28,13 @@ public class SlowCreateActor extends TestRootActor {
|
|||||||
public SlowCreateActor(TbActorId actorId, ActorTestCtx testCtx, CountDownLatch initLatch) {
|
public SlowCreateActor(TbActorId actorId, ActorTestCtx testCtx, CountDownLatch initLatch) {
|
||||||
super(actorId, testCtx);
|
super(actorId, testCtx);
|
||||||
try {
|
try {
|
||||||
|
log.info("awaiting on latch {} ...", initLatch);
|
||||||
initLatch.await(TIMEOUT_AWAIT_MAX_MS, TimeUnit.MILLISECONDS);
|
initLatch.await(TIMEOUT_AWAIT_MAX_MS, TimeUnit.MILLISECONDS);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
log.error("interrupted", e);
|
||||||
}
|
}
|
||||||
testCtx.getInvocationCount().incrementAndGet();
|
testCtx.getInvocationCount().incrementAndGet();
|
||||||
|
log.info("finished {} ...", initLatch);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class SlowCreateActorCreator implements TbActorCreator {
|
public static class SlowCreateActorCreator implements TbActorCreator {
|
||||||
@ -54,7 +56,10 @@ public class SlowCreateActor extends TestRootActor {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbActor createActor() {
|
public TbActor createActor() {
|
||||||
return new SlowCreateActor(actorId, testCtx, initLatch);
|
log.info("creating slow actor...");
|
||||||
|
SlowCreateActor slowCreateActor = new SlowCreateActor(actorId, testCtx, initLatch);
|
||||||
|
log.info("created slow actor {}", slowCreateActor);
|
||||||
|
return slowCreateActor;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user