From 3358c061da7d013e632cdd76f858f26c5f833471 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Fri, 5 Jun 2020 10:26:50 +0300 Subject: [PATCH 1/3] Improvements to Mailbox. HighPriorityQueue --- .../server/actors/ActorSystemContext.java | 3 + .../server/actors/app/AppActor.java | 17 ++-- .../RuleChainActorMessageProcessor.java | 6 +- .../server/actors/service/ComponentActor.java | 4 +- .../actors/service/DefaultActorService.java | 12 ++- .../server/actors/tenant/TenantActor.java | 17 ++-- .../queue/DefaultTbCoreConsumerService.java | 2 +- .../DefaultTbRuleEngineConsumerService.java | 2 +- .../rpc/DefaultTbCoreDeviceRpcService.java | 6 +- ...tractMqttServerSideRpcIntegrationTest.java | 2 +- .../server/actors/DefaultTbActorSystem.java | 17 +++- .../server/actors/TbActorMailbox.java | 43 ++++++++--- .../thingsboard/server/actors/TbActorRef.java | 4 +- .../server/actors/TbActorSystem.java | 4 +- .../server/actors/ActorSystemTest.java | 77 +++++++++++-------- .../server/actors/ActorTestCtx.java | 10 ++- .../server/actors/TestRootActor.java | 2 + 17 files changed, 157 insertions(+), 71 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 7b9e1d0fd0..de368fd05a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -514,6 +514,9 @@ public class ActorSystemContext { appActor.tell(tbActorMsg); } + public void tellWithHighPriority(TbActorMsg tbActorMsg) { + appActor.tellWithHighPriority(tbActorMsg); + } public void schedulePeriodicMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs, long periodInMs) { log.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs); diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 3976349de4..d2d36ac899 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -82,12 +82,14 @@ public class AppActor extends ContextAwareActor { onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); break; case TRANSPORT_TO_DEVICE_ACTOR_MSG: + onToDeviceActorMsg((TenantAwareMsg) msg, false); + break; case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: - onToDeviceActorMsg((TenantAwareMsg) msg); + onToDeviceActorMsg((TenantAwareMsg) msg, true); break; default: return false; @@ -155,15 +157,20 @@ public class AppActor extends ContextAwareActor { } } if (target != null) { - target.tell(msg); + target.tellWithHighPriority(msg); } else { log.debug("[{}] Invalid component lifecycle msg: {}", msg.getTenantId(), msg); } } - private void onToDeviceActorMsg(TenantAwareMsg msg) { + private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) { if (!deletedTenants.contains(msg.getTenantId())) { - getOrCreateTenantActor(msg.getTenantId()).tell(msg); + TbActorRef tenantActor = getOrCreateTenantActor(msg.getTenantId()); + if (priority) { + tenantActor.tellWithHighPriority(msg); + } else { + tenantActor.tell(msg); + } } else { if (msg instanceof TransportToDeviceActorMsgWrapper) { ((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess(); diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 4c59b1fa40..afb50a18aa 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -128,7 +128,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor { log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId); RuleNodeCtx removed = nodeActors.remove(ruleNodeId); - removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED)); + removed.getSelfActor().tellWithHighPriority(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED)); }); initRoutes(ruleChain, ruleNodeList); @@ -155,7 +155,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor actorRef.tell(msg)); + nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(actorRef -> actorRef.tellWithHighPriority(msg)); } private TbActorRef createRuleNodeActor(TbActorCtx ctx, RuleNode ruleNode) { diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java index ec4611a8c8..da560cef97 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java @@ -83,7 +83,9 @@ public abstract class ComponentActor actorMsg = encodingService.decode(toCoreNotification.getComponentLifecycleMsg().toByteArray()); if (actorMsg.isPresent()) { log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tell(actorMsg.get()); + actorContext.tellWithHighPriority(actorMsg.get()); } callback.onSuccess(); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 1e5933abad..b98f043a0d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -230,7 +230,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< Optional actorMsg = encodingService.decode(nfMsg.getComponentLifecycleMsg().toByteArray()); if (actorMsg.isPresent()) { log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tell(actorMsg.get()); + actorContext.tellWithHighPriority(actorMsg.get()); } callback.onSuccess(); } else if (nfMsg.hasFromDeviceRpcResponse()) { diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java index 3ac6fea2f7..b5e273c223 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java @@ -121,7 +121,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { log.trace("[{}][{}] Processing local rpc call to device actor [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); UUID requestId = request.getId(); localToDeviceRpcRequests.put(requestId, rpcMsg); - actorContext.tell(rpcMsg); + actorContext.tellWithHighPriority(rpcMsg); scheduleToDeviceTimeout(request, requestId); } @@ -175,7 +175,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { } private void scheduleToRuleEngineTimeout(ToDeviceRpcRequest request, UUID requestId) { - long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); + long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1); log.trace("[{}] processing to rule engine request.", requestId); scheduler.schedule(() -> { log.trace("[{}] timeout for processing to rule engine request.", requestId); @@ -187,7 +187,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { } private void scheduleToDeviceTimeout(ToDeviceRpcRequest request, UUID requestId) { - long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); + long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1); log.trace("[{}] processing to device request.", requestId); scheduler.schedule(() -> { log.trace("[{}] timeout for to device request.", requestId); diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java index 9ddb2c81d9..31f9189014 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -193,7 +193,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"27\",\"value\": 1},\"timeout\": 6000}"; String deviceId = savedDevice.getId().getId().toString(); - doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), + doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().is(409), asyncContextTimeoutToUseRpcPlugin); } diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java b/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java index f291c8e804..b92802d7f3 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -132,19 +132,28 @@ public class DefaultTbActorSystem implements TbActorSystem { } @Override - public void tell(TbActorRef target, TbActorMsg actorMsg) { - target.tell(actorMsg); + public void tellWithHighPriority(TbActorId target, TbActorMsg actorMsg) { + tell(target, actorMsg, true); } @Override public void tell(TbActorId target, TbActorMsg actorMsg) { + tell(target, actorMsg, false); + } + + private void tell(TbActorId target, TbActorMsg actorMsg, boolean highPriority) { TbActorMailbox mailbox = actors.get(target); if (mailbox == null) { throw new TbActorNotRegisteredException(target, "Actor with id [" + target + "] is not registered!"); } - mailbox.enqueue(actorMsg); + if (highPriority) { + mailbox.tellWithHighPriority(actorMsg); + } else { + mailbox.tell(actorMsg); + } } + @Override public void broadcastToChildren(TbActorId parent, TbActorMsg msg) { broadcastToChildren(parent, id -> true, msg); diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java index 3931fa2535..f19f81b9d0 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -29,6 +29,9 @@ import java.util.function.Supplier; @Slf4j @Data public final class TbActorMailbox implements TbActorCtx { + private static final boolean HIGH_PRIORITY = true; + private static final boolean NORMAL_PRIORITY = false; + private static final boolean FREE = false; private static final boolean BUSY = true; @@ -41,7 +44,8 @@ public final class TbActorMailbox implements TbActorCtx { private final TbActorRef parentRef; private final TbActor actor; private final Dispatcher dispatcher; - private final ConcurrentLinkedQueue msgs = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue highPriorityMsgs = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue normalPriorityMsgs = new ConcurrentLinkedQueue<>(); private final AtomicBoolean busy = new AtomicBoolean(FREE); private final AtomicBoolean ready = new AtomicBoolean(NOT_READY); private final AtomicBoolean destroyInProgress = new AtomicBoolean(); @@ -78,23 +82,38 @@ public final class TbActorMailbox implements TbActorCtx { } } - public void enqueue(TbActorMsg msg) { - msgs.add(msg); + private void enqueue(TbActorMsg msg, boolean highPriority) { + if (highPriority) { + highPriorityMsgs.add(msg); + } else { + normalPriorityMsgs.add(msg); + } tryProcessQueue(true); } private void tryProcessQueue(boolean newMsg) { - if (ready.get() == READY && (newMsg || !msgs.isEmpty()) && busy.compareAndSet(FREE, BUSY)) { - dispatcher.getExecutor().execute(this::processMailbox); + if (ready.get() == READY) { + if (newMsg || !highPriorityMsgs.isEmpty() || !normalPriorityMsgs.isEmpty()) { + if (busy.compareAndSet(FREE, BUSY)) { + dispatcher.getExecutor().execute(this::processMailbox); + } else { + log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg); + } + } else { + log.trace("[{}] MessageBox is empty, new msg: {}", selfId, newMsg); + } } else { - log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg); + log.trace("[{}] MessageBox is not ready, new msg: {}", selfId, newMsg); } } private void processMailbox() { boolean noMoreElements = false; for (int i = 0; i < settings.getActorThroughput(); i++) { - TbActorMsg msg = msgs.poll(); + TbActorMsg msg = highPriorityMsgs.poll(); + if (msg == null) { + msg = normalPriorityMsgs.poll(); + } if (msg != null) { try { log.debug("[{}] Going to process message: {}", selfId, msg); @@ -178,6 +197,12 @@ public final class TbActorMailbox implements TbActorCtx { @Override public void tell(TbActorMsg actorMsg) { - enqueue(actorMsg); + enqueue(actorMsg, NORMAL_PRIORITY); } + + @Override + public void tellWithHighPriority(TbActorMsg actorMsg) { + enqueue(actorMsg, HIGH_PRIORITY); + } + } diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java index b3bc983518..23639fc6e6 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,4 +23,6 @@ public interface TbActorRef { void tell(TbActorMsg actorMsg); + void tellWithHighPriority(TbActorMsg actorMsg); + } diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java index 2a5df1c861..89a5e3744e 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java @@ -36,10 +36,10 @@ public interface TbActorSystem { TbActorRef createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent); - void tell(TbActorRef target, TbActorMsg actorMsg); - void tell(TbActorId target, TbActorMsg actorMsg); + void tellWithHighPriority(TbActorId target, TbActorMsg actorMsg); + void stop(TbActorRef actorRef); void stop(TbActorId actorId); diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java index d2d41d5b79..b8fb9f52fa 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -42,8 +42,8 @@ public class ActorSystemTest { public static final String ROOT_DISPATCHER = "root-dispatcher"; private static final int _1M = 1024 * 1024; - private TbActorSystem actorSystem; - private ExecutorService submitPool; + private volatile TbActorSystem actorSystem; + private volatile ExecutorService submitPool; @Before public void initActorSystem() { @@ -52,7 +52,11 @@ public class ActorSystemTest { TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42); actorSystem = new DefaultTbActorSystem(settings); submitPool = Executors.newWorkStealingPool(parallelism); +// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newCachedThreadPool()); +// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newFixedThreadPool(parallelism)); actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); +// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(1)); +// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newFixedThreadPool(1)); } @After @@ -61,19 +65,29 @@ public class ActorSystemTest { submitPool.shutdownNow(); } + @Test + public void test1actorsAnd1MMessages() throws InterruptedException { + testActorsAndMessages(1, _1M, 5); + } + @Test public void test10actorsAnd1MMessages() throws InterruptedException { - testActorsAndMessages(10, _1M); + testActorsAndMessages(10, _1M, 5); + } + + @Test + public void test1MActorsAnd1Messages5times() throws InterruptedException { + testActorsAndMessages(_1M, 1, 5); } @Test public void test1MActorsAnd10Messages() throws InterruptedException { - testActorsAndMessages(_1M, 10); + testActorsAndMessages(_1M, 10, 1); } @Test public void test1KActorsAnd1KMessages() throws InterruptedException { - testActorsAndMessages(1000, 1000); + testActorsAndMessages(1000, 1000, 10); } @Test @@ -86,8 +100,8 @@ public class ActorSystemTest { TbActorRef actorId2 = actorSystem.createRootActor(ROOT_DISPATCHER, new SlowInitActor.SlowInitActorCreator( new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx2)); - actorSystem.tell(actorId1, new IntTbActorMsg(42)); - actorSystem.tell(actorId2, new IntTbActorMsg(42)); + actorId1.tell(new IntTbActorMsg(42)); + actorId2.tell(new IntTbActorMsg(42)); actorSystem.stop(actorId1); Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); @@ -113,7 +127,7 @@ public class ActorSystemTest { public void testActorCreatorCalledOnce() throws InterruptedException { ActorTestCtx testCtx = getActorTestCtx(1); TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); - for(int i =0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx))); } Thread.sleep(1000); @@ -125,7 +139,7 @@ public class ActorSystemTest { } - public void testActorsAndMessages(int actorsCount, int msgNumber) throws InterruptedException { + public void testActorsAndMessages(int actorsCount, int msgNumber, int times) throws InterruptedException { Random random = new Random(); int[] randomIntegers = new int[msgNumber]; long sumTmp = 0; @@ -141,32 +155,35 @@ public class ActorSystemTest { List actorRefs = new ArrayList<>(); for (int actorIdx = 0; actorIdx < actorsCount; actorIdx++) { ActorTestCtx testCtx = getActorTestCtx(msgNumber); - actorRefs.add(actorSystem.createRootActor(ROOT_DISPATCHER, new TestRootActor.TestRootActorCreator( new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx))); testCtxes.add(testCtx); } - long start = System.nanoTime(); - - for (int i = 0; i < msgNumber; i++) { - int tmp = randomIntegers[i]; - submitPool.execute(() -> actorRefs.forEach(actorId -> actorSystem.tell(actorId, new IntTbActorMsg(tmp)))); - } - log.info("Submitted all messages"); - - testCtxes.forEach(ctx -> { - try { - Assert.assertTrue(ctx.getLatch().await(1, TimeUnit.MINUTES)); - Assert.assertEquals(expected, ctx.getActual().get()); - Assert.assertEquals(msgNumber, ctx.getInvocationCount().get()); - } catch (InterruptedException e) { - e.printStackTrace(); + for (int t = 0; t < times; t++) { + long start = System.nanoTime(); + for (int i = 0; i < msgNumber; i++) { + int tmp = randomIntegers[i]; + submitPool.execute(() -> actorRefs.forEach(actorId -> actorId.tell(new IntTbActorMsg(tmp)))); } - }); - - long duration = System.nanoTime() - start; - log.info("Time spend: {}ns ({} ms)", duration, TimeUnit.NANOSECONDS.toMillis(duration)); + log.info("Submitted all messages"); + testCtxes.forEach(ctx -> { + try { + boolean success = ctx.getLatch().await(1, TimeUnit.MINUTES); + if(!success){ + log.warn("Failed: {}, {}", ctx.getActual().get(), ctx.getInvocationCount().get()); + } + Assert.assertTrue(success); + Assert.assertEquals(expected, ctx.getActual().get()); + Assert.assertEquals(msgNumber, ctx.getInvocationCount().get()); + ctx.clear(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + long duration = System.nanoTime() - start; + log.info("Time spend: {}ns ({} ms)", duration, TimeUnit.NANOSECONDS.toMillis(duration)); + } } private ActorTestCtx getActorTestCtx(int i) { diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/ActorTestCtx.java b/common/actor/src/test/java/org/thingsboard/server/actors/ActorTestCtx.java index 7e898f51ac..69d783b6cb 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/ActorTestCtx.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/ActorTestCtx.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.actors; +import lombok.AllArgsConstructor; import lombok.Data; import java.util.concurrent.CountDownLatch; @@ -22,10 +23,17 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @Data +@AllArgsConstructor public class ActorTestCtx { - private final CountDownLatch latch; + private volatile CountDownLatch latch; private final AtomicInteger invocationCount; private final int expectedInvocationCount; private final AtomicLong actual; + + public void clear() { + latch = new CountDownLatch(1); + invocationCount.set(0); + actual.set(0L); + } } diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/TestRootActor.java b/common/actor/src/test/java/org/thingsboard/server/actors/TestRootActor.java index a58fa12ef3..881e536b39 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/TestRootActor.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/TestRootActor.java @@ -51,6 +51,8 @@ public class TestRootActor extends AbstractTbActor { if (count == testCtx.getExpectedInvocationCount()) { testCtx.getActual().set(sum); testCtx.getInvocationCount().addAndGet(count); + sum = 0; + count = 0; testCtx.getLatch().countDown(); } } From cc4f746b1d343d81d0383e037bd0c9eaff09b9bf Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Fri, 5 Jun 2020 12:54:27 +0300 Subject: [PATCH 2/3] Using Exectutor in Kafka Node. NEVER use Fork-Join pool with parallelism 1 --- .../server/actors/app/AppActor.java | 2 +- .../actors/service/DefaultActorService.java | 15 +++--- .../server/actors/tenant/TenantActor.java | 2 +- .../rpc/DefaultTbRuleEngineRpcService.java | 2 +- .../server/mqtt/MqttSqlTestSuite.java | 4 +- ...tractMqttServerSideRpcIntegrationTest.java | 2 +- .../server/actors/DefaultTbActorSystem.java | 2 +- .../server/actors/TbActorMailbox.java | 3 +- .../thingsboard/server/actors/TbActorRef.java | 2 +- .../server/actors/ActorSystemTest.java | 50 +++++++++++-------- .../rule/engine/kafka/TbKafkaNode.java | 15 +++++- 11 files changed, 61 insertions(+), 38 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index d2d36ac899..953188f3f7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java index bfe0aa0a60..2d3775c9f5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.DefaultTbActorSystem; import org.thingsboard.server.actors.TbActorId; @@ -83,10 +84,10 @@ public class DefaultActorService implements ActorService { TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts); system = new DefaultTbActorSystem(settings); - system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(appDispatcherSize)); - system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(tenantDispatcherSize)); - system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(deviceDispatcherSize)); - system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(ruleDispatcherSize)); + system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize)); + system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize)); + system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize)); + system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize)); actorContext.setActorSystem(system); @@ -99,13 +100,13 @@ public class DefaultActorService implements ActorService { log.info("Actor system initialized."); } - private ExecutorService initDispatcherExecutor(int poolSize) { + private ExecutorService initDispatcherExecutor(String dispatcherName, int poolSize) { if (poolSize == 0) { int cores = Runtime.getRuntime().availableProcessors(); poolSize = Math.max(1, cores / 2); } if (poolSize == 1) { - return Executors.newFixedThreadPool(1); + return Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(dispatcherName)); } else { return Executors.newWorkStealingPool(poolSize); } diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 2fa9164de2..b46b5a25c0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index 0ec730b7dc..80d47a84e8 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -164,7 +164,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi } private void scheduleTimeout(ToDeviceRpcRequest request, UUID requestId) { - long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); + long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1); log.trace("[{}] processing the request: [{}]", this.hashCode(), requestId); scheduler.schedule(() -> { log.trace("[{}] timeout the request: [{}]", this.hashCode(), requestId); diff --git a/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java index 69a6e99353..0e7234c716 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java @@ -26,7 +26,9 @@ import java.util.Arrays; @RunWith(ClasspathSuite.class) @ClasspathSuite.ClassnameFilters({ - "org.thingsboard.server.mqtt.rpc.sql.*Test", "org.thingsboard.server.mqtt.telemetry.sql.*Test"}) + "org.thingsboard.server.mqtt.rpc.sql.*Test", + "org.thingsboard.server.mqtt.telemetry.sql.*Test" +}) public class MqttSqlTestSuite { @ClassRule diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java index 31f9189014..54bd9a4836 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -136,7 +136,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"24\",\"value\": 1},\"timeout\": 6000}"; String deviceId = savedDevice.getId().getId().toString(); - doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), + doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().is(409), asyncContextTimeoutToUseRpcPlugin); } diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java b/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java index b92802d7f3..a642ccec69 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java index f19f81b9d0..279332adf3 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -54,7 +54,6 @@ public final class TbActorMailbox implements TbActorCtx { dispatcher.getExecutor().execute(() -> tryInit(1)); } - private void tryInit(int attempt) { try { log.debug("[{}] Trying to init actor, attempt: {}", selfId, attempt); diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java index 23639fc6e6..5866021bd0 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java index b8fb9f52fa..9b8329c2c8 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -40,23 +40,19 @@ import java.util.concurrent.atomic.AtomicLong; public class ActorSystemTest { public static final String ROOT_DISPATCHER = "root-dispatcher"; - private static final int _1M = 1024 * 1024; + private static final int _100K = 100 * 1024; private volatile TbActorSystem actorSystem; private volatile ExecutorService submitPool; + private int parallelism; @Before public void initActorSystem() { int cores = Runtime.getRuntime().availableProcessors(); - int parallelism = Math.max(1, cores / 2); + parallelism = Math.max(2, cores / 2); TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42); actorSystem = new DefaultTbActorSystem(settings); submitPool = Executors.newWorkStealingPool(parallelism); -// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newCachedThreadPool()); -// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newFixedThreadPool(parallelism)); - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); -// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(1)); -// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newFixedThreadPool(1)); } @After @@ -66,32 +62,44 @@ public class ActorSystemTest { } @Test - public void test1actorsAnd1MMessages() throws InterruptedException { - testActorsAndMessages(1, _1M, 5); + public void test1actorsAnd100KMessages() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + testActorsAndMessages(1, _100K, 1); } @Test - public void test10actorsAnd1MMessages() throws InterruptedException { - testActorsAndMessages(10, _1M, 5); + public void test10actorsAnd100KMessages() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + testActorsAndMessages(10, _100K, 1); } @Test - public void test1MActorsAnd1Messages5times() throws InterruptedException { - testActorsAndMessages(_1M, 1, 5); + public void test100KActorsAnd1Messages5timesSingleThread() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newSingleThreadExecutor()); + testActorsAndMessages(_100K, 1, 5); } @Test - public void test1MActorsAnd10Messages() throws InterruptedException { - testActorsAndMessages(_1M, 10, 1); + public void test100KActorsAnd1Messages5times() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + testActorsAndMessages(_100K, 1, 5); + } + + @Test + public void test100KActorsAnd10Messages() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + testActorsAndMessages(_100K, 10, 1); } @Test public void test1KActorsAnd1KMessages() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); testActorsAndMessages(1000, 1000, 10); } @Test public void testNoMessagesAfterDestroy() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); @@ -105,11 +113,12 @@ public class ActorSystemTest { actorSystem.stop(actorId1); Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); - Assert.assertFalse(testCtx1.getLatch().await(2, TimeUnit.SECONDS)); + Assert.assertFalse(testCtx1.getLatch().await(1, TimeUnit.SECONDS)); } @Test public void testOneActorCreated() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); @@ -119,12 +128,13 @@ public class ActorSystemTest { Thread.sleep(1000); actorSystem.tell(actorId, new IntTbActorMsg(42)); - Assert.assertTrue(testCtx1.getLatch().await(3, TimeUnit.SECONDS)); - Assert.assertFalse(testCtx2.getLatch().await(3, TimeUnit.SECONDS)); + Assert.assertTrue(testCtx1.getLatch().await(1, TimeUnit.SECONDS)); + Assert.assertFalse(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); } @Test public void testActorCreatorCalledOnce() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); ActorTestCtx testCtx = getActorTestCtx(1); TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); for (int i = 0; i < 1000; i++) { @@ -170,7 +180,7 @@ public class ActorSystemTest { testCtxes.forEach(ctx -> { try { boolean success = ctx.getLatch().await(1, TimeUnit.MINUTES); - if(!success){ + if (!success) { log.warn("Failed: {}, {}", ctx.getActual().get(), ctx.getInvocationCount().get()); } Assert.assertTrue(success); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index 4955ebc400..967d8e4a50 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -95,8 +95,20 @@ public class TbKafkaNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData()); + try { + ctx.getExternalCallExecutor().executeAsync(() -> { + publish(ctx, msg, topic); + return null; + }); + } catch (Exception e) { + ctx.tellFailure(msg, e); + } + } + + protected void publish(TbContext ctx, TbMsg msg, String topic) { try { if (!addMetadataKeyValuesAsKafkaHeaders) { + //TODO: external system executor producer.send(new ProducerRecord<>(topic, msg.getData()), (metadata, e) -> processRecord(ctx, msg, metadata, e)); } else { @@ -105,9 +117,8 @@ public class TbKafkaNode implements TbNode { producer.send(new ProducerRecord<>(topic, null, null, null, msg.getData(), headers), (metadata, e) -> processRecord(ctx, msg, metadata, e)); } - } catch (Exception e) { - ctx.tellFailure(msg, e); + log.debug("[{}] Failed to process message: {}", ctx.getSelfId(), msg, e); } } From 961c06ad491cd499e9659dc2141ed14035c4a6b2 Mon Sep 17 00:00:00 2001 From: Vladyslav_Prykhodko Date: Fri, 5 Jun 2020 15:36:28 +0300 Subject: [PATCH 3/3] Add create constructor PageLik --- .../src/app/modules/home/models/widget-component.models.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ui-ngx/src/app/modules/home/models/widget-component.models.ts b/ui-ngx/src/app/modules/home/models/widget-component.models.ts index aa3f04bcad..07858516f0 100644 --- a/ui-ngx/src/app/modules/home/models/widget-component.models.ts +++ b/ui-ngx/src/app/modules/home/models/widget-component.models.ts @@ -73,6 +73,8 @@ import { DialogService } from '@core/services/dialog.service'; import { CustomDialogService } from '@home/components/widget/dialog/custom-dialog.service'; import { DatePipe } from '@angular/common'; import { TranslateService } from '@ngx-translate/core'; +import { PageLink } from '@shared/models/page/page-link'; +import { SortOrder } from '@shared/models/page/sort-order'; export interface IWidgetAction { name: string; @@ -297,6 +299,10 @@ export class WidgetContext { this.widgetTitle = undefined; this.widgetActions = undefined; } + + pageLink(pageSize: number, page: number = 0, textSearch: string = null, sortOrder: SortOrder = null): PageLink { + return new PageLink(pageSize, page, textSearch, sortOrder); + }; } export interface IDynamicWidgetComponent {