diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java index 31fae3d7a4..c37fbb1548 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java @@ -41,6 +41,7 @@ public class ActorSystemTest { public static final String ROOT_DISPATCHER = "root-dispatcher"; private static final int _100K = 100 * 1024; + public static final int TIMEOUT_AWAIT_MAX_SEC = 10; private volatile TbActorSystem actorSystem; private volatile ExecutorService submitPool; @@ -52,7 +53,7 @@ public class ActorSystemTest { parallelism = Math.max(2, cores / 2); TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42); actorSystem = new DefaultTbActorSystem(settings); - submitPool = Executors.newWorkStealingPool(parallelism); + submitPool = Executors.newFixedThreadPool(parallelism); //order guaranteed } @After @@ -122,13 +123,23 @@ public class ActorSystemTest { ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); - submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx1))); - submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx2))); + final CountDownLatch initLatch = new CountDownLatch(1); + final CountDownLatch actorsReadyLatch = new CountDownLatch(2); + submitPool.submit(() -> { + actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx1, initLatch)); + actorsReadyLatch.countDown(); + }); + submitPool.submit(() -> { + actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx2, initLatch)); + actorsReadyLatch.countDown(); + }); + + initLatch.countDown(); //replacement for Thread.wait(500) in the SlowCreateActorCreator + Assert.assertTrue(actorsReadyLatch.await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS)); - Thread.sleep(1000); actorSystem.tell(actorId, new IntTbActorMsg(42)); - Assert.assertTrue(testCtx1.getLatch().await(1, TimeUnit.SECONDS)); + Assert.assertTrue(testCtx1.getLatch().await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS)); Assert.assertFalse(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); } @@ -137,13 +148,21 @@ public class ActorSystemTest { actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); ActorTestCtx testCtx = getActorTestCtx(1); TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); - for (int i = 0; i < 1000; i++) { - submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx))); + final int actorsCount = 1000; + final CountDownLatch initLatch = new CountDownLatch(1); + final CountDownLatch actorsReadyLatch = new CountDownLatch(actorsCount); + for (int i = 0; i < actorsCount; i++) { + submitPool.submit(() -> { + actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx, initLatch)); + actorsReadyLatch.countDown(); + }); } - Thread.sleep(1000); + initLatch.countDown(); + Assert.assertTrue(actorsReadyLatch.await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS)); + actorSystem.tell(actorId, new IntTbActorMsg(42)); - Assert.assertTrue(testCtx.getLatch().await(1, TimeUnit.SECONDS)); + Assert.assertTrue(testCtx.getLatch().await(TIMEOUT_AWAIT_MAX_SEC, TimeUnit.SECONDS)); //One for creation and one for message Assert.assertEquals(2, testCtx.getInvocationCount().get()); } diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/SlowCreateActor.java b/common/actor/src/test/java/org/thingsboard/server/actors/SlowCreateActor.java index 50eb00a5ca..f14fe8461e 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/SlowCreateActor.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/SlowCreateActor.java @@ -17,13 +17,18 @@ package org.thingsboard.server.actors; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + @Slf4j public class SlowCreateActor extends TestRootActor { - public SlowCreateActor(TbActorId actorId, ActorTestCtx testCtx) { + public static final int TIMEOUT_AWAIT_MAX_MS = 5000; + + public SlowCreateActor(TbActorId actorId, ActorTestCtx testCtx, CountDownLatch initLatch) { super(actorId, testCtx); try { - Thread.sleep(500); + initLatch.await(TIMEOUT_AWAIT_MAX_MS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } @@ -34,10 +39,12 @@ public class SlowCreateActor extends TestRootActor { private final TbActorId actorId; private final ActorTestCtx testCtx; + private final CountDownLatch initLatch; - public SlowCreateActorCreator(TbActorId actorId, ActorTestCtx testCtx) { + public SlowCreateActorCreator(TbActorId actorId, ActorTestCtx testCtx, CountDownLatch initLatch) { this.actorId = actorId; this.testCtx = testCtx; + this.initLatch = initLatch; } @Override @@ -47,7 +54,7 @@ public class SlowCreateActor extends TestRootActor { @Override public TbActor createActor() { - return new SlowCreateActor(actorId, testCtx); + return new SlowCreateActor(actorId, testCtx, initLatch); } } }