From a3b31337cae7f0b639285855fd1cc050d05cfda4 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 5 May 2021 09:09:12 +0300 Subject: [PATCH] sync method replaced with async executeGenerateAsync for ScriptEngine api. affected Generator node, ruleChainController (merged with ce) --- .../controller/RuleChainController.java | 4 +- .../service/script/RemoteJsInvokeService.java | 8 ++- .../script/RuleNodeJsScriptEngine.java | 19 ++++--- .../rule/engine/api/ScriptEngine.java | 2 +- .../rule/engine/debug/TbMsgGeneratorNode.java | 50 ++++++++++++------- 5 files changed, 54 insertions(+), 29 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java index d5fcc2706f..3b9af2f0f2 100644 --- a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java +++ b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java @@ -74,6 +74,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Slf4j @@ -86,6 +87,7 @@ public class RuleChainController extends BaseController { public static final String RULE_NODE_ID = "ruleNodeId"; private static final ObjectMapper objectMapper = new ObjectMapper(); + public static final int TIMEOUT = 20; @Autowired private InstallScripts installScripts; @@ -391,7 +393,7 @@ public class RuleChainController extends BaseController { output = msgToOutput(engine.executeUpdate(inMsg)); break; case "generate": - output = msgToOutput(engine.executeGenerate(inMsg)); + output = msgToOutput(engine.executeGenerateAsync(inMsg).get(TIMEOUT, TimeUnit.SECONDS)); break; case "filter": boolean result = engine.executeFilter(inMsg); diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index 334a471973..1bc9203333 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -18,7 +18,6 @@ package org.thingsboard.server.service.script; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -26,6 +25,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.springframework.util.StopWatch; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.queue.TbQueueRequestTemplate; @@ -161,6 +161,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { @Override protected ListenableFuture doInvokeFunction(UUID scriptId, String functionName, Object[] args) { + log.trace("doInvokeFunction js-request for uuid {} with timeout {}ms", scriptId, maxRequestsTimeout); String scriptBody = scriptIdToBodysMap.get(scriptId); if (scriptBody == null) { return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!")); @@ -180,6 +181,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { .setInvokeRequest(jsRequestBuilder.build()) .build(); + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + ListenableFuture> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper)); if (maxRequestsTimeout > 0) { future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); @@ -201,6 +205,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } }, callbackExecutor); return Futures.transform(future, response -> { + stopWatch.stop(); + log.trace("doInvokeFunction js-response took {}ms for uuid {}", stopWatch.getTotalTimeMillis(), response.getKey()); JsInvokeProtos.JsInvokeResponse invokeResult = response.getValue().getInvokeResponse(); if (invokeResult.getSuccess()) { return invokeResult.getResult(); diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java index 066ce71a58..c688d6978d 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java @@ -102,7 +102,6 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S String newMessageType = !StringUtils.isEmpty(messageType) ? messageType : msg.getType(); return TbMsg.transformMsg(msg, newMessageType, msg.getOriginator(), newMetadata, newData); } catch (Throwable th) { - th.printStackTrace(); throw new RuntimeException("Failed to unbind message data from javascript result", th); } } @@ -141,13 +140,16 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S } @Override - public TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException { - JsonNode result = executeScript(prevMsg); - if (!result.isObject()) { - log.warn("Wrong result type: {}", result.getNodeType()); - throw new ScriptException("Wrong result type: " + result.getNodeType()); - } - return unbindMsg(result, prevMsg); + public ListenableFuture executeGenerateAsync(TbMsg prevMsg) { + log.trace("execute generate async, prevMsg {}", prevMsg); + return Futures.transformAsync(executeScriptAsync(prevMsg), result -> { + if (!result.isObject()) { + log.warn("Wrong result type: {}", result.getNodeType()); + throw new ScriptException("Wrong result type: " + result.getNodeType()); + } + return Futures.immediateFuture(unbindMsg(result, prevMsg)); + }, MoreExecutors.directExecutor()); + } @Override @@ -234,6 +236,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S } private ListenableFuture executeScriptAsync(TbMsg msg) { + log.trace("execute script async, msg {}", msg); String[] inArgs = prepareArgs(msg); return Futures.transformAsync(sandboxService.invokeFunction(tenantId, msg.getCustomerId(), this.scriptId, inArgs[0], inArgs[1], inArgs[2]), o -> { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java index 49420e8feb..a5dcf535e4 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/ScriptEngine.java @@ -29,7 +29,7 @@ public interface ScriptEngine { ListenableFuture> executeUpdateAsync(TbMsg msg); - TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException; + ListenableFuture executeGenerateAsync(TbMsg prevMsg); boolean executeFilter(TbMsg msg) throws ScriptException; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index b5a47b4a0d..8fa4b97063 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -15,9 +15,12 @@ */ package org.thingsboard.rule.engine.debug; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; +import org.thingsboard.common.util.TbStopWatch; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.ScriptEngine; import org.thingsboard.rule.engine.api.TbContext; @@ -35,6 +38,7 @@ import org.thingsboard.server.common.msg.queue.ServiceQueue; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.thingsboard.common.util.DonAsynchron.withCallback; import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; @@ -64,10 +68,11 @@ public class TbMsgGeneratorNode implements TbNode { private EntityId originatorId; private UUID nextTickId; private TbMsg prevMsg; - private volatile boolean initialized; + private final AtomicBoolean initialized = new AtomicBoolean(false); @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + log.trace("init generator with config {}", configuration); this.config = TbNodeUtils.convert(configuration, TbMsgGeneratorNodeConfiguration.class); this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds()); this.currentMsgCount = 0; @@ -81,35 +86,39 @@ public class TbMsgGeneratorNode implements TbNode { @Override public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) { + log.trace("onPartitionChangeMsg, PartitionChangeMsg {}, config {}", msg, config); updateGeneratorState(ctx); } private void updateGeneratorState(TbContext ctx) { + log.trace("updateGeneratorState, config {}", config); if (ctx.isLocalEntity(originatorId)) { - if (!initialized) { - initialized = true; + if (initialized.compareAndSet(false, true)) { this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "prevMsg", "prevMetadata", "prevMsgType"); scheduleTickMsg(ctx); } - } else if (initialized) { - initialized = false; + } else if (initialized.compareAndSet(true, false)) { destroy(); } } @Override public void onMsg(TbContext ctx, TbMsg msg) { - if (initialized && msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) { + log.trace("onMsg, config {}, msg {}", config, msg); + if (initialized.get() && msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) { + TbStopWatch sw = TbStopWatch.startNew(); withCallback(generate(ctx, msg), m -> { - if (initialized && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) { + log.trace("onMsg onSuccess callback, took {}ms, config {}, msg {}", sw.stopAndGetTotalTimeMillis(), config, msg); + if (initialized.get() && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) { ctx.enqueueForTellNext(m, SUCCESS); scheduleTickMsg(ctx); currentMsgCount++; } }, t -> { - if (initialized && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) { + log.warn("onMsg onFailure callback, took {}ms, config {}, msg {}", sw.stopAndGetTotalTimeMillis(), config, msg); + if (initialized.get() && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) { ctx.tellFailure(msg, t); scheduleTickMsg(ctx); currentMsgCount++; @@ -119,6 +128,7 @@ public class TbMsgGeneratorNode implements TbNode { } private void scheduleTickMsg(TbContext ctx) { + log.trace("scheduleTickMsg, config {}", config); long curTs = System.currentTimeMillis(); if (lastScheduledTs == 0L) { lastScheduledTs = curTs; @@ -131,22 +141,26 @@ public class TbMsgGeneratorNode implements TbNode { } private ListenableFuture generate(TbContext ctx, TbMsg msg) { - return ctx.getJsExecutor().executeAsync(() -> { - if (prevMsg == null) { - prevMsg = ctx.newMsg(ServiceQueue.MAIN, "", originatorId, msg.getCustomerId(), new TbMsgMetaData(), "{}"); - } - if (initialized) { - ctx.logJsEvalRequest(); - TbMsg generated = jsEngine.executeGenerate(prevMsg); + log.trace("generate, config {}", config); + if (prevMsg == null) { + prevMsg = ctx.newMsg(ServiceQueue.MAIN, "", originatorId, msg.getCustomerId(), new TbMsgMetaData(), "{}"); + } + if (initialized.get()) { + ctx.logJsEvalRequest(); + return Futures.transformAsync(jsEngine.executeGenerateAsync(prevMsg), generated -> { + log.trace("generate process response, generated {}, config {}", generated, config); ctx.logJsEvalResponse(); prevMsg = ctx.newMsg(ServiceQueue.MAIN, generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData()); - } - return prevMsg; - }); + return Futures.immediateFuture(prevMsg); + }, MoreExecutors.directExecutor()); + } + return Futures.immediateFuture(prevMsg); + } @Override public void destroy() { + log.trace("destroy, config {}", config); prevMsg = null; if (jsEngine != null) { jsEngine.destroy();