sync method replaced with async executeGenerateAsync for ScriptEngine api. affected Generator node, ruleChainController (merged with ce)

This commit is contained in:
Sergey Matvienko 2021-05-05 09:09:12 +03:00
parent 2a5ba8e8ad
commit a3b31337ca
5 changed files with 54 additions and 29 deletions

View File

@ -74,6 +74,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
@ -86,6 +87,7 @@ public class RuleChainController extends BaseController {
public static final String RULE_NODE_ID = "ruleNodeId";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static final int TIMEOUT = 20;
@Autowired
private InstallScripts installScripts;
@ -391,7 +393,7 @@ public class RuleChainController extends BaseController {
output = msgToOutput(engine.executeUpdate(inMsg));
break;
case "generate":
output = msgToOutput(engine.executeGenerate(inMsg));
output = msgToOutput(engine.executeGenerateAsync(inMsg).get(TIMEOUT, TimeUnit.SECONDS));
break;
case "filter":
boolean result = engine.executeFilter(inMsg);

View File

@ -18,7 +18,6 @@ package org.thingsboard.server.service.script;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -26,6 +25,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
@ -161,6 +161,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
@Override
protected ListenableFuture<Object> doInvokeFunction(UUID scriptId, String functionName, Object[] args) {
log.trace("doInvokeFunction js-request for uuid {} with timeout {}ms", scriptId, maxRequestsTimeout);
String scriptBody = scriptIdToBodysMap.get(scriptId);
if (scriptBody == null) {
return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!"));
@ -180,6 +181,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
.setInvokeRequest(jsRequestBuilder.build())
.build();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
ListenableFuture<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> future = requestTemplate.send(new TbProtoJsQueueMsg<>(UUID.randomUUID(), jsRequestWrapper));
if (maxRequestsTimeout > 0) {
future = Futures.withTimeout(future, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService);
@ -201,6 +205,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
}
}, callbackExecutor);
return Futures.transform(future, response -> {
stopWatch.stop();
log.trace("doInvokeFunction js-response took {}ms for uuid {}", stopWatch.getTotalTimeMillis(), response.getKey());
JsInvokeProtos.JsInvokeResponse invokeResult = response.getValue().getInvokeResponse();
if (invokeResult.getSuccess()) {
return invokeResult.getResult();

View File

@ -102,7 +102,6 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
String newMessageType = !StringUtils.isEmpty(messageType) ? messageType : msg.getType();
return TbMsg.transformMsg(msg, newMessageType, msg.getOriginator(), newMetadata, newData);
} catch (Throwable th) {
th.printStackTrace();
throw new RuntimeException("Failed to unbind message data from javascript result", th);
}
}
@ -141,13 +140,16 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
}
@Override
public TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException {
JsonNode result = executeScript(prevMsg);
if (!result.isObject()) {
log.warn("Wrong result type: {}", result.getNodeType());
throw new ScriptException("Wrong result type: " + result.getNodeType());
}
return unbindMsg(result, prevMsg);
public ListenableFuture<TbMsg> executeGenerateAsync(TbMsg prevMsg) {
log.trace("execute generate async, prevMsg {}", prevMsg);
return Futures.transformAsync(executeScriptAsync(prevMsg), result -> {
if (!result.isObject()) {
log.warn("Wrong result type: {}", result.getNodeType());
throw new ScriptException("Wrong result type: " + result.getNodeType());
}
return Futures.immediateFuture(unbindMsg(result, prevMsg));
}, MoreExecutors.directExecutor());
}
@Override
@ -234,6 +236,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
}
private ListenableFuture<JsonNode> executeScriptAsync(TbMsg msg) {
log.trace("execute script async, msg {}", msg);
String[] inArgs = prepareArgs(msg);
return Futures.transformAsync(sandboxService.invokeFunction(tenantId, msg.getCustomerId(), this.scriptId, inArgs[0], inArgs[1], inArgs[2]),
o -> {

View File

@ -29,7 +29,7 @@ public interface ScriptEngine {
ListenableFuture<List<TbMsg>> executeUpdateAsync(TbMsg msg);
TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException;
ListenableFuture<TbMsg> executeGenerateAsync(TbMsg prevMsg);
boolean executeFilter(TbMsg msg) throws ScriptException;

View File

@ -15,9 +15,12 @@
*/
package org.thingsboard.rule.engine.debug;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.thingsboard.common.util.TbStopWatch;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.ScriptEngine;
import org.thingsboard.rule.engine.api.TbContext;
@ -35,6 +38,7 @@ import org.thingsboard.server.common.msg.queue.ServiceQueue;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.thingsboard.common.util.DonAsynchron.withCallback;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@ -64,10 +68,11 @@ public class TbMsgGeneratorNode implements TbNode {
private EntityId originatorId;
private UUID nextTickId;
private TbMsg prevMsg;
private volatile boolean initialized;
private final AtomicBoolean initialized = new AtomicBoolean(false);
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
log.trace("init generator with config {}", configuration);
this.config = TbNodeUtils.convert(configuration, TbMsgGeneratorNodeConfiguration.class);
this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds());
this.currentMsgCount = 0;
@ -81,35 +86,39 @@ public class TbMsgGeneratorNode implements TbNode {
@Override
public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
log.trace("onPartitionChangeMsg, PartitionChangeMsg {}, config {}", msg, config);
updateGeneratorState(ctx);
}
private void updateGeneratorState(TbContext ctx) {
log.trace("updateGeneratorState, config {}", config);
if (ctx.isLocalEntity(originatorId)) {
if (!initialized) {
initialized = true;
if (initialized.compareAndSet(false, true)) {
this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "prevMsg", "prevMetadata", "prevMsgType");
scheduleTickMsg(ctx);
}
} else if (initialized) {
initialized = false;
} else if (initialized.compareAndSet(true, false)) {
destroy();
}
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
if (initialized && msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
log.trace("onMsg, config {}, msg {}", config, msg);
if (initialized.get() && msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
TbStopWatch sw = TbStopWatch.startNew();
withCallback(generate(ctx, msg),
m -> {
if (initialized && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) {
log.trace("onMsg onSuccess callback, took {}ms, config {}, msg {}", sw.stopAndGetTotalTimeMillis(), config, msg);
if (initialized.get() && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) {
ctx.enqueueForTellNext(m, SUCCESS);
scheduleTickMsg(ctx);
currentMsgCount++;
}
},
t -> {
if (initialized && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) {
log.warn("onMsg onFailure callback, took {}ms, config {}, msg {}", sw.stopAndGetTotalTimeMillis(), config, msg);
if (initialized.get() && (config.getMsgCount() == TbMsgGeneratorNodeConfiguration.UNLIMITED_MSG_COUNT || currentMsgCount < config.getMsgCount())) {
ctx.tellFailure(msg, t);
scheduleTickMsg(ctx);
currentMsgCount++;
@ -119,6 +128,7 @@ public class TbMsgGeneratorNode implements TbNode {
}
private void scheduleTickMsg(TbContext ctx) {
log.trace("scheduleTickMsg, config {}", config);
long curTs = System.currentTimeMillis();
if (lastScheduledTs == 0L) {
lastScheduledTs = curTs;
@ -131,22 +141,26 @@ public class TbMsgGeneratorNode implements TbNode {
}
private ListenableFuture<TbMsg> generate(TbContext ctx, TbMsg msg) {
return ctx.getJsExecutor().executeAsync(() -> {
if (prevMsg == null) {
prevMsg = ctx.newMsg(ServiceQueue.MAIN, "", originatorId, msg.getCustomerId(), new TbMsgMetaData(), "{}");
}
if (initialized) {
ctx.logJsEvalRequest();
TbMsg generated = jsEngine.executeGenerate(prevMsg);
log.trace("generate, config {}", config);
if (prevMsg == null) {
prevMsg = ctx.newMsg(ServiceQueue.MAIN, "", originatorId, msg.getCustomerId(), new TbMsgMetaData(), "{}");
}
if (initialized.get()) {
ctx.logJsEvalRequest();
return Futures.transformAsync(jsEngine.executeGenerateAsync(prevMsg), generated -> {
log.trace("generate process response, generated {}, config {}", generated, config);
ctx.logJsEvalResponse();
prevMsg = ctx.newMsg(ServiceQueue.MAIN, generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData());
}
return prevMsg;
});
return Futures.immediateFuture(prevMsg);
}, MoreExecutors.directExecutor());
}
return Futures.immediateFuture(prevMsg);
}
@Override
public void destroy() {
log.trace("destroy, config {}", config);
prevMsg = null;
if (jsEngine != null) {
jsEngine.destroy();