diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 7b9e1d0fd0..de368fd05a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -514,6 +514,9 @@ public class ActorSystemContext { appActor.tell(tbActorMsg); } + public void tellWithHighPriority(TbActorMsg tbActorMsg) { + appActor.tellWithHighPriority(tbActorMsg); + } public void schedulePeriodicMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs, long periodInMs) { log.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs); diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 3976349de4..953188f3f7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -82,12 +82,14 @@ public class AppActor extends ContextAwareActor { onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); break; case TRANSPORT_TO_DEVICE_ACTOR_MSG: + onToDeviceActorMsg((TenantAwareMsg) msg, false); + break; case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: - onToDeviceActorMsg((TenantAwareMsg) msg); + onToDeviceActorMsg((TenantAwareMsg) msg, true); break; default: return false; @@ -155,15 +157,20 @@ public class AppActor extends ContextAwareActor { } } if (target != null) { - target.tell(msg); + target.tellWithHighPriority(msg); } else { log.debug("[{}] Invalid component lifecycle msg: {}", msg.getTenantId(), msg); } } - private void onToDeviceActorMsg(TenantAwareMsg msg) { + private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) { if (!deletedTenants.contains(msg.getTenantId())) { - getOrCreateTenantActor(msg.getTenantId()).tell(msg); + TbActorRef tenantActor = getOrCreateTenantActor(msg.getTenantId()); + if (priority) { + tenantActor.tellWithHighPriority(msg); + } else { + tenantActor.tell(msg); + } } else { if (msg instanceof TransportToDeviceActorMsgWrapper) { ((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess(); diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 4c59b1fa40..afb50a18aa 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -128,7 +128,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor { log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId); RuleNodeCtx removed = nodeActors.remove(ruleNodeId); - removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED)); + removed.getSelfActor().tellWithHighPriority(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED)); }); initRoutes(ruleChain, ruleNodeList); @@ -155,7 +155,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor actorRef.tell(msg)); + nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(actorRef -> actorRef.tellWithHighPriority(msg)); } private TbActorRef createRuleNodeActor(TbActorCtx ctx, RuleNode ruleNode) { diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java index ec4611a8c8..da560cef97 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java @@ -83,7 +83,9 @@ public abstract class ComponentActor actorMsg = encodingService.decode(toCoreNotification.getComponentLifecycleMsg().toByteArray()); if (actorMsg.isPresent()) { log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tell(actorMsg.get()); + actorContext.tellWithHighPriority(actorMsg.get()); } callback.onSuccess(); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 1e5933abad..b98f043a0d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -230,7 +230,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< Optional actorMsg = encodingService.decode(nfMsg.getComponentLifecycleMsg().toByteArray()); if (actorMsg.isPresent()) { log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tell(actorMsg.get()); + actorContext.tellWithHighPriority(actorMsg.get()); } callback.onSuccess(); } else if (nfMsg.hasFromDeviceRpcResponse()) { diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java index 3ac6fea2f7..b5e273c223 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java @@ -121,7 +121,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { log.trace("[{}][{}] Processing local rpc call to device actor [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); UUID requestId = request.getId(); localToDeviceRpcRequests.put(requestId, rpcMsg); - actorContext.tell(rpcMsg); + actorContext.tellWithHighPriority(rpcMsg); scheduleToDeviceTimeout(request, requestId); } @@ -175,7 +175,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { } private void scheduleToRuleEngineTimeout(ToDeviceRpcRequest request, UUID requestId) { - long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); + long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1); log.trace("[{}] processing to rule engine request.", requestId); scheduler.schedule(() -> { log.trace("[{}] timeout for processing to rule engine request.", requestId); @@ -187,7 +187,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { } private void scheduleToDeviceTimeout(ToDeviceRpcRequest request, UUID requestId) { - long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); + long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1); log.trace("[{}] processing to device request.", requestId); scheduler.schedule(() -> { log.trace("[{}] timeout for to device request.", requestId); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index 0ec730b7dc..80d47a84e8 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -164,7 +164,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi } private void scheduleTimeout(ToDeviceRpcRequest request, UUID requestId) { - long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); + long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1); log.trace("[{}] processing the request: [{}]", this.hashCode(), requestId); scheduler.schedule(() -> { log.trace("[{}] timeout the request: [{}]", this.hashCode(), requestId); diff --git a/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java index 69a6e99353..0e7234c716 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java @@ -26,7 +26,9 @@ import java.util.Arrays; @RunWith(ClasspathSuite.class) @ClasspathSuite.ClassnameFilters({ - "org.thingsboard.server.mqtt.rpc.sql.*Test", "org.thingsboard.server.mqtt.telemetry.sql.*Test"}) + "org.thingsboard.server.mqtt.rpc.sql.*Test", + "org.thingsboard.server.mqtt.telemetry.sql.*Test" +}) public class MqttSqlTestSuite { @ClassRule diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java index d27b603aea..abd13f99a6 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -136,7 +136,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"24\",\"value\": 1},\"timeout\": 6000}"; String deviceId = savedDevice.getId().getId().toString(); - doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), + doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().is(409), asyncContextTimeoutToUseRpcPlugin); } @@ -193,7 +193,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"27\",\"value\": 1},\"timeout\": 6000}"; String deviceId = savedDevice.getId().getId().toString(); - doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), + doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().is(409), asyncContextTimeoutToUseRpcPlugin); } diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java b/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java index f291c8e804..a642ccec69 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java @@ -132,19 +132,28 @@ public class DefaultTbActorSystem implements TbActorSystem { } @Override - public void tell(TbActorRef target, TbActorMsg actorMsg) { - target.tell(actorMsg); + public void tellWithHighPriority(TbActorId target, TbActorMsg actorMsg) { + tell(target, actorMsg, true); } @Override public void tell(TbActorId target, TbActorMsg actorMsg) { + tell(target, actorMsg, false); + } + + private void tell(TbActorId target, TbActorMsg actorMsg, boolean highPriority) { TbActorMailbox mailbox = actors.get(target); if (mailbox == null) { throw new TbActorNotRegisteredException(target, "Actor with id [" + target + "] is not registered!"); } - mailbox.enqueue(actorMsg); + if (highPriority) { + mailbox.tellWithHighPriority(actorMsg); + } else { + mailbox.tell(actorMsg); + } } + @Override public void broadcastToChildren(TbActorId parent, TbActorMsg msg) { broadcastToChildren(parent, id -> true, msg); diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java index 3931fa2535..279332adf3 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java @@ -29,6 +29,9 @@ import java.util.function.Supplier; @Slf4j @Data public final class TbActorMailbox implements TbActorCtx { + private static final boolean HIGH_PRIORITY = true; + private static final boolean NORMAL_PRIORITY = false; + private static final boolean FREE = false; private static final boolean BUSY = true; @@ -41,7 +44,8 @@ public final class TbActorMailbox implements TbActorCtx { private final TbActorRef parentRef; private final TbActor actor; private final Dispatcher dispatcher; - private final ConcurrentLinkedQueue msgs = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue highPriorityMsgs = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue normalPriorityMsgs = new ConcurrentLinkedQueue<>(); private final AtomicBoolean busy = new AtomicBoolean(FREE); private final AtomicBoolean ready = new AtomicBoolean(NOT_READY); private final AtomicBoolean destroyInProgress = new AtomicBoolean(); @@ -50,7 +54,6 @@ public final class TbActorMailbox implements TbActorCtx { dispatcher.getExecutor().execute(() -> tryInit(1)); } - private void tryInit(int attempt) { try { log.debug("[{}] Trying to init actor, attempt: {}", selfId, attempt); @@ -78,23 +81,38 @@ public final class TbActorMailbox implements TbActorCtx { } } - public void enqueue(TbActorMsg msg) { - msgs.add(msg); + private void enqueue(TbActorMsg msg, boolean highPriority) { + if (highPriority) { + highPriorityMsgs.add(msg); + } else { + normalPriorityMsgs.add(msg); + } tryProcessQueue(true); } private void tryProcessQueue(boolean newMsg) { - if (ready.get() == READY && (newMsg || !msgs.isEmpty()) && busy.compareAndSet(FREE, BUSY)) { - dispatcher.getExecutor().execute(this::processMailbox); + if (ready.get() == READY) { + if (newMsg || !highPriorityMsgs.isEmpty() || !normalPriorityMsgs.isEmpty()) { + if (busy.compareAndSet(FREE, BUSY)) { + dispatcher.getExecutor().execute(this::processMailbox); + } else { + log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg); + } + } else { + log.trace("[{}] MessageBox is empty, new msg: {}", selfId, newMsg); + } } else { - log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg); + log.trace("[{}] MessageBox is not ready, new msg: {}", selfId, newMsg); } } private void processMailbox() { boolean noMoreElements = false; for (int i = 0; i < settings.getActorThroughput(); i++) { - TbActorMsg msg = msgs.poll(); + TbActorMsg msg = highPriorityMsgs.poll(); + if (msg == null) { + msg = normalPriorityMsgs.poll(); + } if (msg != null) { try { log.debug("[{}] Going to process message: {}", selfId, msg); @@ -178,6 +196,12 @@ public final class TbActorMailbox implements TbActorCtx { @Override public void tell(TbActorMsg actorMsg) { - enqueue(actorMsg); + enqueue(actorMsg, NORMAL_PRIORITY); } + + @Override + public void tellWithHighPriority(TbActorMsg actorMsg) { + enqueue(actorMsg, HIGH_PRIORITY); + } + } diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java index b3bc983518..5866021bd0 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java @@ -23,4 +23,6 @@ public interface TbActorRef { void tell(TbActorMsg actorMsg); + void tellWithHighPriority(TbActorMsg actorMsg); + } diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java index 2a5df1c861..89a5e3744e 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java @@ -36,10 +36,10 @@ public interface TbActorSystem { TbActorRef createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent); - void tell(TbActorRef target, TbActorMsg actorMsg); - void tell(TbActorId target, TbActorMsg actorMsg); + void tellWithHighPriority(TbActorId target, TbActorMsg actorMsg); + void stop(TbActorRef actorRef); void stop(TbActorId actorId); diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java index d2d41d5b79..9b8329c2c8 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java @@ -40,19 +40,19 @@ import java.util.concurrent.atomic.AtomicLong; public class ActorSystemTest { public static final String ROOT_DISPATCHER = "root-dispatcher"; - private static final int _1M = 1024 * 1024; + private static final int _100K = 100 * 1024; - private TbActorSystem actorSystem; - private ExecutorService submitPool; + private volatile TbActorSystem actorSystem; + private volatile ExecutorService submitPool; + private int parallelism; @Before public void initActorSystem() { int cores = Runtime.getRuntime().availableProcessors(); - int parallelism = Math.max(1, cores / 2); + parallelism = Math.max(2, cores / 2); TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42); actorSystem = new DefaultTbActorSystem(settings); submitPool = Executors.newWorkStealingPool(parallelism); - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); } @After @@ -62,22 +62,44 @@ public class ActorSystemTest { } @Test - public void test10actorsAnd1MMessages() throws InterruptedException { - testActorsAndMessages(10, _1M); + public void test1actorsAnd100KMessages() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + testActorsAndMessages(1, _100K, 1); } @Test - public void test1MActorsAnd10Messages() throws InterruptedException { - testActorsAndMessages(_1M, 10); + public void test10actorsAnd100KMessages() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + testActorsAndMessages(10, _100K, 1); + } + + @Test + public void test100KActorsAnd1Messages5timesSingleThread() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newSingleThreadExecutor()); + testActorsAndMessages(_100K, 1, 5); + } + + @Test + public void test100KActorsAnd1Messages5times() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + testActorsAndMessages(_100K, 1, 5); + } + + @Test + public void test100KActorsAnd10Messages() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + testActorsAndMessages(_100K, 10, 1); } @Test public void test1KActorsAnd1KMessages() throws InterruptedException { - testActorsAndMessages(1000, 1000); + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + testActorsAndMessages(1000, 1000, 10); } @Test public void testNoMessagesAfterDestroy() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); @@ -86,16 +108,17 @@ public class ActorSystemTest { TbActorRef actorId2 = actorSystem.createRootActor(ROOT_DISPATCHER, new SlowInitActor.SlowInitActorCreator( new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx2)); - actorSystem.tell(actorId1, new IntTbActorMsg(42)); - actorSystem.tell(actorId2, new IntTbActorMsg(42)); + actorId1.tell(new IntTbActorMsg(42)); + actorId2.tell(new IntTbActorMsg(42)); actorSystem.stop(actorId1); Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); - Assert.assertFalse(testCtx1.getLatch().await(2, TimeUnit.SECONDS)); + Assert.assertFalse(testCtx1.getLatch().await(1, TimeUnit.SECONDS)); } @Test public void testOneActorCreated() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); @@ -105,15 +128,16 @@ public class ActorSystemTest { Thread.sleep(1000); actorSystem.tell(actorId, new IntTbActorMsg(42)); - Assert.assertTrue(testCtx1.getLatch().await(3, TimeUnit.SECONDS)); - Assert.assertFalse(testCtx2.getLatch().await(3, TimeUnit.SECONDS)); + Assert.assertTrue(testCtx1.getLatch().await(1, TimeUnit.SECONDS)); + Assert.assertFalse(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); } @Test public void testActorCreatorCalledOnce() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); ActorTestCtx testCtx = getActorTestCtx(1); TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); - for(int i =0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx))); } Thread.sleep(1000); @@ -125,7 +149,7 @@ public class ActorSystemTest { } - public void testActorsAndMessages(int actorsCount, int msgNumber) throws InterruptedException { + public void testActorsAndMessages(int actorsCount, int msgNumber, int times) throws InterruptedException { Random random = new Random(); int[] randomIntegers = new int[msgNumber]; long sumTmp = 0; @@ -141,32 +165,35 @@ public class ActorSystemTest { List actorRefs = new ArrayList<>(); for (int actorIdx = 0; actorIdx < actorsCount; actorIdx++) { ActorTestCtx testCtx = getActorTestCtx(msgNumber); - actorRefs.add(actorSystem.createRootActor(ROOT_DISPATCHER, new TestRootActor.TestRootActorCreator( new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx))); testCtxes.add(testCtx); } - long start = System.nanoTime(); - - for (int i = 0; i < msgNumber; i++) { - int tmp = randomIntegers[i]; - submitPool.execute(() -> actorRefs.forEach(actorId -> actorSystem.tell(actorId, new IntTbActorMsg(tmp)))); - } - log.info("Submitted all messages"); - - testCtxes.forEach(ctx -> { - try { - Assert.assertTrue(ctx.getLatch().await(1, TimeUnit.MINUTES)); - Assert.assertEquals(expected, ctx.getActual().get()); - Assert.assertEquals(msgNumber, ctx.getInvocationCount().get()); - } catch (InterruptedException e) { - e.printStackTrace(); + for (int t = 0; t < times; t++) { + long start = System.nanoTime(); + for (int i = 0; i < msgNumber; i++) { + int tmp = randomIntegers[i]; + submitPool.execute(() -> actorRefs.forEach(actorId -> actorId.tell(new IntTbActorMsg(tmp)))); } - }); - - long duration = System.nanoTime() - start; - log.info("Time spend: {}ns ({} ms)", duration, TimeUnit.NANOSECONDS.toMillis(duration)); + log.info("Submitted all messages"); + testCtxes.forEach(ctx -> { + try { + boolean success = ctx.getLatch().await(1, TimeUnit.MINUTES); + if (!success) { + log.warn("Failed: {}, {}", ctx.getActual().get(), ctx.getInvocationCount().get()); + } + Assert.assertTrue(success); + Assert.assertEquals(expected, ctx.getActual().get()); + Assert.assertEquals(msgNumber, ctx.getInvocationCount().get()); + ctx.clear(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + long duration = System.nanoTime() - start; + log.info("Time spend: {}ns ({} ms)", duration, TimeUnit.NANOSECONDS.toMillis(duration)); + } } private ActorTestCtx getActorTestCtx(int i) { diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/ActorTestCtx.java b/common/actor/src/test/java/org/thingsboard/server/actors/ActorTestCtx.java index 7e898f51ac..69d783b6cb 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/ActorTestCtx.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/ActorTestCtx.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.actors; +import lombok.AllArgsConstructor; import lombok.Data; import java.util.concurrent.CountDownLatch; @@ -22,10 +23,17 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @Data +@AllArgsConstructor public class ActorTestCtx { - private final CountDownLatch latch; + private volatile CountDownLatch latch; private final AtomicInteger invocationCount; private final int expectedInvocationCount; private final AtomicLong actual; + + public void clear() { + latch = new CountDownLatch(1); + invocationCount.set(0); + actual.set(0L); + } } diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/TestRootActor.java b/common/actor/src/test/java/org/thingsboard/server/actors/TestRootActor.java index a58fa12ef3..881e536b39 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/TestRootActor.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/TestRootActor.java @@ -51,6 +51,8 @@ public class TestRootActor extends AbstractTbActor { if (count == testCtx.getExpectedInvocationCount()) { testCtx.getActual().set(sum); testCtx.getInvocationCount().addAndGet(count); + sum = 0; + count = 0; testCtx.getLatch().countDown(); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index 4955ebc400..967d8e4a50 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -95,8 +95,20 @@ public class TbKafkaNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData()); + try { + ctx.getExternalCallExecutor().executeAsync(() -> { + publish(ctx, msg, topic); + return null; + }); + } catch (Exception e) { + ctx.tellFailure(msg, e); + } + } + + protected void publish(TbContext ctx, TbMsg msg, String topic) { try { if (!addMetadataKeyValuesAsKafkaHeaders) { + //TODO: external system executor producer.send(new ProducerRecord<>(topic, msg.getData()), (metadata, e) -> processRecord(ctx, msg, metadata, e)); } else { @@ -105,9 +117,8 @@ public class TbKafkaNode implements TbNode { producer.send(new ProducerRecord<>(topic, null, null, null, msg.getData(), headers), (metadata, e) -> processRecord(ctx, msg, metadata, e)); } - } catch (Exception e) { - ctx.tellFailure(msg, e); + log.debug("[{}] Failed to process message: {}", ctx.getSelfId(), msg, e); } } diff --git a/ui-ngx/src/app/modules/home/models/widget-component.models.ts b/ui-ngx/src/app/modules/home/models/widget-component.models.ts index aa3f04bcad..07858516f0 100644 --- a/ui-ngx/src/app/modules/home/models/widget-component.models.ts +++ b/ui-ngx/src/app/modules/home/models/widget-component.models.ts @@ -73,6 +73,8 @@ import { DialogService } from '@core/services/dialog.service'; import { CustomDialogService } from '@home/components/widget/dialog/custom-dialog.service'; import { DatePipe } from '@angular/common'; import { TranslateService } from '@ngx-translate/core'; +import { PageLink } from '@shared/models/page/page-link'; +import { SortOrder } from '@shared/models/page/sort-order'; export interface IWidgetAction { name: string; @@ -297,6 +299,10 @@ export class WidgetContext { this.widgetTitle = undefined; this.widgetActions = undefined; } + + pageLink(pageSize: number, page: number = 0, textSearch: string = null, sortOrder: SortOrder = null): PageLink { + return new PageLink(pageSize, page, textSearch, sortOrder); + }; } export interface IDynamicWidgetComponent {