From cc4f746b1d343d81d0383e037bd0c9eaff09b9bf Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Fri, 5 Jun 2020 12:54:27 +0300 Subject: [PATCH] Using Exectutor in Kafka Node. NEVER use Fork-Join pool with parallelism 1 --- .../server/actors/app/AppActor.java | 2 +- .../actors/service/DefaultActorService.java | 15 +++--- .../server/actors/tenant/TenantActor.java | 2 +- .../rpc/DefaultTbRuleEngineRpcService.java | 2 +- .../server/mqtt/MqttSqlTestSuite.java | 4 +- ...tractMqttServerSideRpcIntegrationTest.java | 2 +- .../server/actors/DefaultTbActorSystem.java | 2 +- .../server/actors/TbActorMailbox.java | 3 +- .../thingsboard/server/actors/TbActorRef.java | 2 +- .../server/actors/ActorSystemTest.java | 50 +++++++++++-------- .../rule/engine/kafka/TbKafkaNode.java | 15 +++++- 11 files changed, 61 insertions(+), 38 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index d2d36ac899..953188f3f7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java index bfe0aa0a60..2d3775c9f5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.DefaultTbActorSystem; import org.thingsboard.server.actors.TbActorId; @@ -83,10 +84,10 @@ public class DefaultActorService implements ActorService { TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts); system = new DefaultTbActorSystem(settings); - system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(appDispatcherSize)); - system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(tenantDispatcherSize)); - system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(deviceDispatcherSize)); - system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(ruleDispatcherSize)); + system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize)); + system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize)); + system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize)); + system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize)); actorContext.setActorSystem(system); @@ -99,13 +100,13 @@ public class DefaultActorService implements ActorService { log.info("Actor system initialized."); } - private ExecutorService initDispatcherExecutor(int poolSize) { + private ExecutorService initDispatcherExecutor(String dispatcherName, int poolSize) { if (poolSize == 0) { int cores = Runtime.getRuntime().availableProcessors(); poolSize = Math.max(1, cores / 2); } if (poolSize == 1) { - return Executors.newFixedThreadPool(1); + return Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(dispatcherName)); } else { return Executors.newWorkStealingPool(poolSize); } diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 2fa9164de2..b46b5a25c0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index 0ec730b7dc..80d47a84e8 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -164,7 +164,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi } private void scheduleTimeout(ToDeviceRpcRequest request, UUID requestId) { - long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()); + long timeout = Math.max(0, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1); log.trace("[{}] processing the request: [{}]", this.hashCode(), requestId); scheduler.schedule(() -> { log.trace("[{}] timeout the request: [{}]", this.hashCode(), requestId); diff --git a/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java index 69a6e99353..0e7234c716 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/MqttSqlTestSuite.java @@ -26,7 +26,9 @@ import java.util.Arrays; @RunWith(ClasspathSuite.class) @ClasspathSuite.ClassnameFilters({ - "org.thingsboard.server.mqtt.rpc.sql.*Test", "org.thingsboard.server.mqtt.telemetry.sql.*Test"}) + "org.thingsboard.server.mqtt.rpc.sql.*Test", + "org.thingsboard.server.mqtt.telemetry.sql.*Test" +}) public class MqttSqlTestSuite { @ClassRule diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java index 31f9189014..54bd9a4836 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -136,7 +136,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"24\",\"value\": 1},\"timeout\": 6000}"; String deviceId = savedDevice.getId().getId().toString(); - doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), + doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().is(409), asyncContextTimeoutToUseRpcPlugin); } diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java b/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java index b92802d7f3..a642ccec69 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java index f19f81b9d0..279332adf3 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -54,7 +54,6 @@ public final class TbActorMailbox implements TbActorCtx { dispatcher.getExecutor().execute(() -> tryInit(1)); } - private void tryInit(int attempt) { try { log.debug("[{}] Trying to init actor, attempt: {}", selfId, attempt); diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java index 23639fc6e6..5866021bd0 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java index b8fb9f52fa..9b8329c2c8 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -40,23 +40,19 @@ import java.util.concurrent.atomic.AtomicLong; public class ActorSystemTest { public static final String ROOT_DISPATCHER = "root-dispatcher"; - private static final int _1M = 1024 * 1024; + private static final int _100K = 100 * 1024; private volatile TbActorSystem actorSystem; private volatile ExecutorService submitPool; + private int parallelism; @Before public void initActorSystem() { int cores = Runtime.getRuntime().availableProcessors(); - int parallelism = Math.max(1, cores / 2); + parallelism = Math.max(2, cores / 2); TbActorSystemSettings settings = new TbActorSystemSettings(5, parallelism, 42); actorSystem = new DefaultTbActorSystem(settings); submitPool = Executors.newWorkStealingPool(parallelism); -// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newCachedThreadPool()); -// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newFixedThreadPool(parallelism)); - actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); -// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(1)); -// actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newFixedThreadPool(1)); } @After @@ -66,32 +62,44 @@ public class ActorSystemTest { } @Test - public void test1actorsAnd1MMessages() throws InterruptedException { - testActorsAndMessages(1, _1M, 5); + public void test1actorsAnd100KMessages() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + testActorsAndMessages(1, _100K, 1); } @Test - public void test10actorsAnd1MMessages() throws InterruptedException { - testActorsAndMessages(10, _1M, 5); + public void test10actorsAnd100KMessages() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + testActorsAndMessages(10, _100K, 1); } @Test - public void test1MActorsAnd1Messages5times() throws InterruptedException { - testActorsAndMessages(_1M, 1, 5); + public void test100KActorsAnd1Messages5timesSingleThread() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newSingleThreadExecutor()); + testActorsAndMessages(_100K, 1, 5); } @Test - public void test1MActorsAnd10Messages() throws InterruptedException { - testActorsAndMessages(_1M, 10, 1); + public void test100KActorsAnd1Messages5times() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + testActorsAndMessages(_100K, 1, 5); + } + + @Test + public void test100KActorsAnd10Messages() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); + testActorsAndMessages(_100K, 10, 1); } @Test public void test1KActorsAnd1KMessages() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); testActorsAndMessages(1000, 1000, 10); } @Test public void testNoMessagesAfterDestroy() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); @@ -105,11 +113,12 @@ public class ActorSystemTest { actorSystem.stop(actorId1); Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); - Assert.assertFalse(testCtx1.getLatch().await(2, TimeUnit.SECONDS)); + Assert.assertFalse(testCtx1.getLatch().await(1, TimeUnit.SECONDS)); } @Test public void testOneActorCreated() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); @@ -119,12 +128,13 @@ public class ActorSystemTest { Thread.sleep(1000); actorSystem.tell(actorId, new IntTbActorMsg(42)); - Assert.assertTrue(testCtx1.getLatch().await(3, TimeUnit.SECONDS)); - Assert.assertFalse(testCtx2.getLatch().await(3, TimeUnit.SECONDS)); + Assert.assertTrue(testCtx1.getLatch().await(1, TimeUnit.SECONDS)); + Assert.assertFalse(testCtx2.getLatch().await(1, TimeUnit.SECONDS)); } @Test public void testActorCreatorCalledOnce() throws InterruptedException { + actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism)); ActorTestCtx testCtx = getActorTestCtx(1); TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); for (int i = 0; i < 1000; i++) { @@ -170,7 +180,7 @@ public class ActorSystemTest { testCtxes.forEach(ctx -> { try { boolean success = ctx.getLatch().await(1, TimeUnit.MINUTES); - if(!success){ + if (!success) { log.warn("Failed: {}, {}", ctx.getActual().get(), ctx.getInvocationCount().get()); } Assert.assertTrue(success); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index 4955ebc400..967d8e4a50 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -95,8 +95,20 @@ public class TbKafkaNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg.getMetaData()); + try { + ctx.getExternalCallExecutor().executeAsync(() -> { + publish(ctx, msg, topic); + return null; + }); + } catch (Exception e) { + ctx.tellFailure(msg, e); + } + } + + protected void publish(TbContext ctx, TbMsg msg, String topic) { try { if (!addMetadataKeyValuesAsKafkaHeaders) { + //TODO: external system executor producer.send(new ProducerRecord<>(topic, msg.getData()), (metadata, e) -> processRecord(ctx, msg, metadata, e)); } else { @@ -105,9 +117,8 @@ public class TbKafkaNode implements TbNode { producer.send(new ProducerRecord<>(topic, null, null, null, msg.getData(), headers), (metadata, e) -> processRecord(ctx, msg, metadata, e)); } - } catch (Exception e) { - ctx.tellFailure(msg, e); + log.debug("[{}] Failed to process message: {}", ctx.getSelfId(), msg, e); } }