Refactor script engine resources.

This commit is contained in:
Igor Kulikov 2018-04-05 15:07:48 +03:00
parent 36e45c7630
commit b7cd24c693
13 changed files with 96 additions and 64 deletions

View File

@ -29,9 +29,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.Base64Utils;
import org.thingsboard.rule.engine.api.ListeningExecutor; import org.thingsboard.rule.engine.api.ListeningExecutor;
import org.thingsboard.rule.engine.js.JsExecutorService;
import org.thingsboard.server.actors.service.ActorService; import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Event; import org.thingsboard.server.common.data.Event;
@ -39,7 +37,6 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.controller.plugin.PluginWebSocketMsgEndpoint; import org.thingsboard.server.controller.plugin.PluginWebSocketMsgEndpoint;
@ -63,15 +60,10 @@ import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j @Slf4j
@Component @Component
@ -170,6 +162,10 @@ public class ActorSystemContext {
@Setter @Setter
private PluginWebSocketMsgEndpoint wsMsgEndpoint; private PluginWebSocketMsgEndpoint wsMsgEndpoint;
@Autowired
@Getter
private ListeningExecutor jsExecutor;
@Value("${actors.session.sync.timeout}") @Value("${actors.session.sync.timeout}")
@Getter @Getter
private long syncSessionTimeout; private long syncSessionTimeout;
@ -339,9 +335,4 @@ public class ActorSystemContext {
return Exception.class.isInstance(error) ? (Exception) error : new Exception(error); return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
} }
public ListeningExecutor getJsExecutor() {
//TODO: take thread count from yml.
return new JsExecutorService(1);
}
} }

View File

@ -20,6 +20,7 @@ import akka.actor.Cancellable;
import com.google.common.base.Function; import com.google.common.base.Function;
import org.thingsboard.rule.engine.api.ListeningExecutor; import org.thingsboard.rule.engine.api.ListeningExecutor;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.ScriptEngine;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.RuleNodeId;
@ -36,6 +37,7 @@ import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.service.script.NashornJsEngine;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
import java.util.List; import java.util.List;
@ -127,6 +129,11 @@ class DefaultTbContext implements TbContext {
return mainCtx.getJsExecutor(); return mainCtx.getJsExecutor();
} }
@Override
public ScriptEngine createJsScriptEngine(String script, String functionName, String... argNames) {
return new NashornJsEngine(script, functionName, argNames);
}
@Override @Override
public AttributesService getAttributesService() { public AttributesService getAttributesService() {
return mainCtx.getAttributesService(); return mainCtx.getAttributesService();

View File

@ -13,23 +13,38 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.rule.engine.js; package org.thingsboard.server.service.script;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.ListeningExecutor; import org.thingsboard.rule.engine.api.ListeningExecutor;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@Component
public class JsExecutorService implements ListeningExecutor{ public class JsExecutorService implements ListeningExecutor{
private final ListeningExecutorService service; @Value("${actors.rule.js_thread_pool_size}")
private int jsExecutorThreadPoolSize;
public JsExecutorService(int threadCount) { private ListeningExecutorService service;
this.service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount));
@PostConstruct
public void init() {
this.service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(jsExecutorThreadPoolSize));
}
@PreDestroy
public void destroy() {
if (this.service != null) {
this.service.shutdown();
}
} }
@Override @Override
@ -37,9 +52,4 @@ public class JsExecutorService implements ListeningExecutor{
return service.submit(task); return service.submit(task);
} }
@PreDestroy
@Override
public void onDestroy() {
service.shutdown();
}
} }

View File

@ -13,32 +13,28 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.rule.engine.js; package org.thingsboard.server.service.script;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import jdk.nashorn.api.scripting.NashornScriptEngineFactory; import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
import jdk.nashorn.api.scripting.ScriptObjectMirror;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import javax.script.*; import javax.script.Invocable;
import java.io.IOException; import javax.script.ScriptEngine;
import java.nio.charset.StandardCharsets; import javax.script.ScriptException;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@Slf4j @Slf4j
public class NashornJsEngine { public class NashornJsEngine implements org.thingsboard.rule.engine.api.ScriptEngine {
public static final String MSG = "msg"; public static final String MSG = "msg";
public static final String METADATA = "metadata"; public static final String METADATA = "metadata";
@ -129,6 +125,7 @@ public class NashornJsEngine {
} }
} }
@Override
public TbMsg executeUpdate(TbMsg msg) throws ScriptException { public TbMsg executeUpdate(TbMsg msg) throws ScriptException {
JsonNode result = executeScript(msg); JsonNode result = executeScript(msg);
if (!result.isObject()) { if (!result.isObject()) {
@ -138,6 +135,7 @@ public class NashornJsEngine {
return unbindMsg(result, msg); return unbindMsg(result, msg);
} }
@Override
public TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException { public TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException {
JsonNode result = executeScript(prevMsg); JsonNode result = executeScript(prevMsg);
if (!result.isObject()) { if (!result.isObject()) {
@ -147,6 +145,7 @@ public class NashornJsEngine {
return unbindMsg(result, prevMsg); return unbindMsg(result, prevMsg);
} }
@Override
public boolean executeFilter(TbMsg msg) throws ScriptException { public boolean executeFilter(TbMsg msg) throws ScriptException {
JsonNode result = executeScript(msg); JsonNode result = executeScript(msg);
if (!result.isBoolean()) { if (!result.isBoolean()) {
@ -156,7 +155,8 @@ public class NashornJsEngine {
return result.asBoolean(); return result.asBoolean();
} }
public Set<String> executeSwitch(TbMsg msg) throws ScriptException, NoSuchMethodException { @Override
public Set<String> executeSwitch(TbMsg msg) throws ScriptException {
JsonNode result = executeScript(msg); JsonNode result = executeScript(msg);
if (result.isTextual()) { if (result.isTextual()) {
return Collections.singleton(result.asText()); return Collections.singleton(result.asText());
@ -191,6 +191,6 @@ public class NashornJsEngine {
} }
public void destroy() { public void destroy() {
//engine = null; engine = null;
} }
} }

View File

@ -219,6 +219,8 @@ actors:
termination.delay: "${ACTORS_RULE_TERMINATION_DELAY:30000}" termination.delay: "${ACTORS_RULE_TERMINATION_DELAY:30000}"
# Errors for particular actor are persisted once per specified amount of milliseconds # Errors for particular actor are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_ERROR_FREQUENCY:3000}" error_persist_frequency: "${ACTORS_RULE_ERROR_FREQUENCY:3000}"
# Specify thread pool size for javascript executor service
js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:10}"
chain: chain:
# Errors for particular actor are persisted once per specified amount of milliseconds # Errors for particular actor are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}" error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}"

View File

@ -23,5 +23,4 @@ public interface ListeningExecutor {
<T> ListenableFuture<T> executeAsync(Callable<T> task); <T> ListenableFuture<T> executeAsync(Callable<T> task);
void onDestroy();
} }

View File

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api;
import org.thingsboard.server.common.msg.TbMsg;
import javax.script.ScriptException;
import java.util.Set;
public interface ScriptEngine {
TbMsg executeUpdate(TbMsg msg) throws ScriptException;
TbMsg executeGenerate(TbMsg prevMsg) throws ScriptException;
boolean executeFilter(TbMsg msg) throws ScriptException;
Set<String> executeSwitch(TbMsg msg) throws ScriptException;
void destroy();
}

View File

@ -83,4 +83,6 @@ public interface TbContext {
ListeningExecutor getJsExecutor(); ListeningExecutor getJsExecutor();
ScriptEngine createJsScriptEngine(String script, String functionName, String... argNames);
} }

View File

@ -20,23 +20,13 @@ import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.thingsboard.rule.engine.TbNodeUtils; import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.ListeningExecutor; import org.thingsboard.rule.engine.api.*;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.filter.TbJsFilterNodeConfiguration;
import org.thingsboard.rule.engine.js.NashornJsEngine;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import javax.script.Bindings;
import java.nio.charset.StandardCharsets;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -59,7 +49,7 @@ public class TbMsgGeneratorNode implements TbNode {
public static final String TB_MSG_GENERATOR_NODE_MSG = "TbMsgGeneratorNodeMsg"; public static final String TB_MSG_GENERATOR_NODE_MSG = "TbMsgGeneratorNodeMsg";
private TbMsgGeneratorNodeConfiguration config; private TbMsgGeneratorNodeConfiguration config;
private NashornJsEngine jsEngine; private ScriptEngine jsEngine;
private long delay; private long delay;
private EntityId originatorId; private EntityId originatorId;
private UUID nextTickId; private UUID nextTickId;
@ -74,7 +64,7 @@ public class TbMsgGeneratorNode implements TbNode {
} else { } else {
originatorId = ctx.getSelfId(); originatorId = ctx.getSelfId();
} }
this.jsEngine = new NashornJsEngine(config.getJsScript(), "Generate", "prevMsg", "prevMetadata", "prevMsgType"); this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "Generate", "prevMsg", "prevMetadata", "prevMsgType");
sentTickMsg(ctx); sentTickMsg(ctx);
} }

View File

@ -18,12 +18,9 @@ package org.thingsboard.rule.engine.filter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.TbNodeUtils; import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.*; import org.thingsboard.rule.engine.api.*;
import org.thingsboard.rule.engine.js.NashornJsEngine;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import javax.script.Bindings;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback; import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
@Slf4j @Slf4j
@ -43,12 +40,12 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
public class TbJsFilterNode implements TbNode { public class TbJsFilterNode implements TbNode {
private TbJsFilterNodeConfiguration config; private TbJsFilterNodeConfiguration config;
private NashornJsEngine jsEngine; private ScriptEngine jsEngine;
@Override @Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbJsFilterNodeConfiguration.class); this.config = TbNodeUtils.convert(configuration, TbJsFilterNodeConfiguration.class);
this.jsEngine = new NashornJsEngine(config.getJsScript(), "Filter"); this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "Filter");
} }
@Override @Override

View File

@ -18,11 +18,9 @@ package org.thingsboard.rule.engine.filter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.TbNodeUtils; import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.*; import org.thingsboard.rule.engine.api.*;
import org.thingsboard.rule.engine.js.NashornJsEngine;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import javax.script.Bindings;
import java.util.Set; import java.util.Set;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback; import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
@ -43,12 +41,12 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
public class TbJsSwitchNode implements TbNode { public class TbJsSwitchNode implements TbNode {
private TbJsSwitchNodeConfiguration config; private TbJsSwitchNodeConfiguration config;
private NashornJsEngine jsEngine; private ScriptEngine jsEngine;
@Override @Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbJsSwitchNodeConfiguration.class); this.config = TbNodeUtils.convert(configuration, TbJsSwitchNodeConfiguration.class);
this.jsEngine = new NashornJsEngine(config.getJsScript(), "Switch"); this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "Switch");
} }
@Override @Override

View File

@ -18,12 +18,9 @@ package org.thingsboard.rule.engine.transform;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.TbNodeUtils; import org.thingsboard.rule.engine.TbNodeUtils;
import org.thingsboard.rule.engine.api.*; import org.thingsboard.rule.engine.api.*;
import org.thingsboard.rule.engine.js.NashornJsEngine;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import javax.script.Bindings;
@RuleNode( @RuleNode(
type = ComponentType.TRANSFORMATION, type = ComponentType.TRANSFORMATION,
name = "script", name = "script",
@ -41,12 +38,12 @@ import javax.script.Bindings;
public class TbTransformMsgNode extends TbAbstractTransformNode { public class TbTransformMsgNode extends TbAbstractTransformNode {
private TbTransformMsgNodeConfiguration config; private TbTransformMsgNodeConfiguration config;
private NashornJsEngine jsEngine; private ScriptEngine jsEngine;
@Override @Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbTransformMsgNodeConfiguration.class); this.config = TbNodeUtils.convert(configuration, TbTransformMsgNodeConfiguration.class);
this.jsEngine = new NashornJsEngine(config.getJsScript(), "Transform"); this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "Transform");
setConfig(config); setConfig(config);
} }

View File

@ -353,7 +353,9 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
vm.editingRuleNodeLinkIndex = vm.ruleChainModel.edges.indexOf(edge); vm.editingRuleNodeLinkIndex = vm.ruleChainModel.edges.indexOf(edge);
vm.editingRuleNodeLink = angular.copy(edge); vm.editingRuleNodeLink = angular.copy(edge);
$mdUtil.nextTick(() => { $mdUtil.nextTick(() => {
if (vm.ruleNodeLinkForm) {
vm.ruleNodeLinkForm.$setPristine(); vm.ruleNodeLinkForm.$setPristine();
}
}); });
} }
}, },
@ -366,7 +368,9 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
vm.editingRuleNodeIndex = vm.ruleChainModel.nodes.indexOf(node); vm.editingRuleNodeIndex = vm.ruleChainModel.nodes.indexOf(node);
vm.editingRuleNode = angular.copy(node); vm.editingRuleNode = angular.copy(node);
$mdUtil.nextTick(() => { $mdUtil.nextTick(() => {
if (vm.ruleNodeForm) {
vm.ruleNodeForm.$setPristine(); vm.ruleNodeForm.$setPristine();
}
}); });
} }
} }
@ -737,7 +741,7 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
nodes.push(node); nodes.push(node);
} }
} }
var res = $filter('filter')(vm.ruleChainModel.edges, {source: vm.inputConnectorId}); var res = $filter('filter')(vm.ruleChainModel.edges, {source: vm.inputConnectorId}, true);
if (res && res.length) { if (res && res.length) {
var firstNodeEdge = res[0]; var firstNodeEdge = res[0];
var firstNode = vm.modelservice.nodes.getNodeByConnectorId(firstNodeEdge.destination); var firstNode = vm.modelservice.nodes.getNodeByConnectorId(firstNodeEdge.destination);