diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java index 14a844ed73..75bf5bab6e 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java @@ -16,12 +16,11 @@ package org.thingsboard.server.actors; import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.Test; -import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.id.DeviceId; @@ -37,13 +36,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import static org.assertj.core.api.Assertions.assertThat; + @Slf4j -@ExtendWith(MockitoExtension.class) public class ActorSystemTest { public static final String ROOT_DISPATCHER = "root-dispatcher"; private static final int _100K = 100 * 1024; - public static final int TIMEOUT_AWAIT_MAX_SEC = 100; + public static final int TIMEOUT_AWAIT_MAX_SEC = 30; private volatile TbActorSystem actorSystem; private volatile ExecutorService submitPool; @@ -54,6 +54,7 @@ public class ActorSystemTest { public void initActorSystem() { int cores = Runtime.getRuntime().availableProcessors(); parallelism = Math.max(2, cores / 2); + log.debug("parallelism {}", parallelism); TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42); actorSystem = new DefaultTbActorSystem(settings); submitPool = Executors.newFixedThreadPool(parallelism, ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-submit-test-scope")); //order guaranteed @@ -136,25 +137,39 @@ public class ActorSystemTest { actorSystem.createDispatcher(ROOT_DISPATCHER, executor); ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); + assertThat(testCtx1.getLatch().getCount()).as("testCtx1 latch initial state").isEqualTo(1); + assertThat(testCtx2.getLatch().getCount()).as("testCtx2 latch initial state").isEqualTo(1); TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); final CountDownLatch initLatch = new CountDownLatch(1); final CountDownLatch actorsReadyLatch = new CountDownLatch(2); submitPool.submit(() -> { + log.info("submit 1"); actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx1, initLatch)); actorsReadyLatch.countDown(); + log.info("done 1"); }); submitPool.submit(() -> { + log.info("submit 2"); actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx2, initLatch)); actorsReadyLatch.countDown(); + log.info("done 2"); }); initLatch.countDown(); //replacement for Thread.wait(500) in the SlowCreateActorCreator Assertions.assertTrue(actorsReadyLatch.await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS)); - + log.info("actorsReadyLatch ok"); actorSystem.tell(actorId, new IntTbActorMsg(42)); - Assertions.assertTrue(testCtx1.getLatch().await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS)); - Assertions.assertFalse(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); + //only one of two contexts are initialized. no matter Ctx1 or Ctx2 + Awaitility.await("one of two actors latch zeroed").atMost(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS) + .until(() -> testCtx1.getLatch().getCount() + testCtx2.getLatch().getCount() == 1); + Thread.yield(); + if (testCtx1.getLatch().getCount() == 0) { + assertThat(testCtx2.getLatch().await(100, TimeUnit.MILLISECONDS)).as("testCtx2 never latched").isFalse(); + } else { + assertThat(testCtx1.getLatch().await(100, TimeUnit.MILLISECONDS)).as("testCtx1 never latched").isFalse(); + } + } @Test @@ -233,7 +248,7 @@ public class ActorSystemTest { log.info("Submitted all messages"); testCtxes.forEach(ctx -> { try { - boolean success = ctx.getLatch().await(1, TimeUnit.MINUTES); + boolean success = ctx.getLatch().await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS); if (!success) { log.warn("Failed: {}, {}", ctx.getActual().get(), ctx.getInvocationCount().get()); } @@ -242,7 +257,7 @@ public class ActorSystemTest { Assertions.assertEquals(msgNumber, ctx.getInvocationCount().get()); ctx.clear(); } catch (InterruptedException e) { - e.printStackTrace(); + log.error("interrupted", e); } }); long duration = System.nanoTime() - start; diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/SlowCreateActor.java b/common/actor/src/test/java/org/thingsboard/server/actors/SlowCreateActor.java index 332224095f..77893ec625 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/SlowCreateActor.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/SlowCreateActor.java @@ -28,11 +28,13 @@ public class SlowCreateActor extends TestRootActor { public SlowCreateActor(TbActorId actorId, ActorTestCtx testCtx, CountDownLatch initLatch) { super(actorId, testCtx); try { + log.info("awaiting on latch {} ...", initLatch); initLatch.await(TIMEOUT_AWAIT_MAX_MS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - e.printStackTrace(); + log.error("interrupted", e); } testCtx.getInvocationCount().incrementAndGet(); + log.info("finished {} ...", initLatch); } public static class SlowCreateActorCreator implements TbActorCreator { @@ -54,7 +56,10 @@ public class SlowCreateActor extends TestRootActor { @Override public TbActor createActor() { - return new SlowCreateActor(actorId, testCtx, initLatch); + log.info("creating slow actor..."); + SlowCreateActor slowCreateActor = new SlowCreateActor(actorId, testCtx, initLatch); + log.info("created slow actor {}", slowCreateActor); + return slowCreateActor; } } }