From 831f6201dbf5d4d85c6d38f9a47de05a50e5ac8e Mon Sep 17 00:00:00 2001 From: vparomskiy Date: Tue, 11 Sep 2018 14:36:24 +0300 Subject: [PATCH] sync deduplicated script evaluation/release --- .../actors/ruleChain/DefaultTbContext.java | 2 +- .../controller/RuleChainController.java | 2 +- .../AbstractNashornJsSandboxService.java | 157 ++++++++++++------ .../service/script/JsSandboxService.java | 5 +- .../script/RuleNodeJsScriptEngine.java | 9 +- application/src/main/resources/logback.xml | 2 +- .../script/RuleNodeJsScriptEngineTest.java | 95 ++++++++++- 7 files changed, 206 insertions(+), 66 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 7279347b1c..d5260da10b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -154,7 +154,7 @@ class DefaultTbContext implements TbContext { @Override public ScriptEngine createJsScriptEngine(String script, String... argNames) { - return new RuleNodeJsScriptEngine(mainCtx.getJsSandbox(), script, argNames); + return new RuleNodeJsScriptEngine(mainCtx.getJsSandbox(), nodeCtx.getSelf().getId(), script, argNames); } @Override diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java index 86b8fdacd7..82ab1702fa 100644 --- a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java +++ b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java @@ -276,7 +276,7 @@ public class RuleChainController extends BaseController { String errorText = ""; ScriptEngine engine = null; try { - engine = new RuleNodeJsScriptEngine(jsSandboxService, script, argNames); + engine = new RuleNodeJsScriptEngine(jsSandboxService, getCurrentUser().getId(), script, argNames); TbMsg inMsg = new TbMsg(UUIDs.timeBased(), msgType, null, new TbMsgMetaData(metadata), data, null, null, 0L); switch (scriptType) { case "update": diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsSandboxService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsSandboxService.java index 724f652609..a0c4d5e4de 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsSandboxService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsSandboxService.java @@ -13,16 +13,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.thingsboard.server.service.script; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import delight.nashornsandbox.NashornSandbox; import delight.nashornsandbox.NashornSandboxes; import jdk.nashorn.api.scripting.NashornScriptEngineFactory; +import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.tuple.Pair; +import org.thingsboard.server.common.data.id.EntityId; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -42,17 +45,20 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic private NashornSandbox sandbox; private ScriptEngine engine; private ExecutorService monitorExecutorService; + private ListeningExecutorService evalExecutorService; private final Map functionsMap = new ConcurrentHashMap<>(); - private final Map blackListedFunctions = new ConcurrentHashMap<>(); - private final Map> scriptToId = new ConcurrentHashMap<>(); - private final Map scriptIdToCount = new ConcurrentHashMap<>(); + private final Map blackListedFunctions = new ConcurrentHashMap<>(); + + private final Map scriptKeyToInfo = new ConcurrentHashMap<>(); + private final Map scriptIdToInfo = new ConcurrentHashMap<>(); @PostConstruct public void init() { if (useJsSandbox()) { sandbox = NashornSandboxes.create(); monitorExecutorService = Executors.newFixedThreadPool(getMonitorThreadPoolSize()); + evalExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); sandbox.setExecutor(monitorExecutorService); sandbox.setMaxCPUTime(getMaxCpuTime()); sandbox.allowNoBraces(false); @@ -65,9 +71,12 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic @PreDestroy public void stop() { - if (monitorExecutorService != null) { + if (monitorExecutorService != null) { monitorExecutorService.shutdownNow(); } + if (evalExecutorService != null) { + evalExecutorService.shutdownNow(); + } } protected abstract boolean useJsSandbox(); @@ -80,33 +89,39 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic @Override public ListenableFuture eval(JsScriptType scriptType, String scriptBody, String... argNames) { - Pair deduplicated = deduplicate(scriptType, scriptBody); - UUID scriptId = deduplicated.getLeft(); - AtomicInteger duplicateCount = deduplicated.getRight(); + ScriptInfo scriptInfo = deduplicate(scriptType, scriptBody); + UUID scriptId = scriptInfo.getId(); + AtomicInteger duplicateCount = scriptInfo.getCount(); - if(duplicateCount.compareAndSet(0, 1)) { - String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_'); - String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames); - try { - if (useJsSandbox()) { - sandbox.eval(jsScript); - } else { - engine.eval(jsScript); + synchronized (scriptInfo.getLock()) { + if (duplicateCount.compareAndSet(0, 1)) { + try { + evaluate(scriptId, scriptType, scriptBody, argNames); + } catch (Exception e) { + duplicateCount.decrementAndGet(); + log.warn("Failed to compile JS script: {}", e.getMessage(), e); + return Futures.immediateFailedFuture(e); } - functionsMap.put(scriptId, functionName); - } catch (Exception e) { - duplicateCount.decrementAndGet(); - log.warn("Failed to compile JS script: {}", e.getMessage(), e); - return Futures.immediateFailedFuture(e); + } else { + duplicateCount.incrementAndGet(); } - } else { - duplicateCount.incrementAndGet(); } return Futures.immediateFuture(scriptId); } + private void evaluate(UUID scriptId, JsScriptType scriptType, String scriptBody, String... argNames) throws ScriptException { + String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_'); + String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames); + if (useJsSandbox()) { + sandbox.eval(jsScript); + } else { + engine.eval(jsScript); + } + functionsMap.put(scriptId, functionName); + } + @Override - public ListenableFuture invokeFunction(UUID scriptId, Object... args) { + public ListenableFuture invokeFunction(UUID scriptId, EntityId entityId, Object... args) { String functionName = functionsMap.get(scriptId); if (functionName == null) { return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!")); @@ -117,11 +132,12 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic if (useJsSandbox()) { result = sandbox.getSandboxedInvocable().invokeFunction(functionName, args); } else { - result = ((Invocable)engine).invokeFunction(functionName, args); + result = ((Invocable) engine).invokeFunction(functionName, args); } return Futures.immediateFuture(result); } catch (Exception e) { - blackListedFunctions.computeIfAbsent(scriptId, key -> new AtomicInteger(0)).incrementAndGet(); + BlackListKey blackListKey = new BlackListKey(scriptId, entityId); + blackListedFunctions.computeIfAbsent(blackListKey, key -> new AtomicInteger(0)).incrementAndGet(); return Futures.immediateFailedFuture(e); } } else { @@ -131,31 +147,41 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic } @Override - public ListenableFuture release(UUID scriptId) { - AtomicInteger count = scriptIdToCount.get(scriptId); - if(count != null) { - if(count.decrementAndGet() > 0) { - return Futures.immediateFuture(null); - } + public ListenableFuture release(UUID scriptId, EntityId entityId) { + ScriptInfo scriptInfo = scriptIdToInfo.get(scriptId); + if (scriptInfo == null) { + log.warn("Script release called for not existing script id [{}]", scriptId); + return Futures.immediateFuture(null); } - String functionName = functionsMap.get(scriptId); - if (functionName != null) { - try { - if (useJsSandbox()) { - sandbox.eval(functionName + " = undefined;"); - } else { - engine.eval(functionName + " = undefined;"); + synchronized (scriptInfo.getLock()) { + int remainingDuplicates = scriptInfo.getCount().decrementAndGet(); + if (remainingDuplicates > 0) { + return Futures.immediateFuture(null); + } + + String functionName = functionsMap.get(scriptId); + if (functionName != null) { + try { + if (useJsSandbox()) { + sandbox.eval(functionName + " = undefined;"); + } else { + engine.eval(functionName + " = undefined;"); + } + functionsMap.remove(scriptId); + blackListedFunctions.remove(new BlackListKey(scriptId, entityId)); + } catch (ScriptException e) { + log.error("Could not release script [{}] [{}]", scriptId, remainingDuplicates); + return Futures.immediateFailedFuture(e); } - functionsMap.remove(scriptId); - blackListedFunctions.remove(scriptId); - } catch (ScriptException e) { - return Futures.immediateFailedFuture(e); + } else { + log.warn("Function name do not exist for script [{}] [{}]", scriptId, remainingDuplicates); } } return Futures.immediateFuture(null); } + private boolean isBlackListed(UUID scriptId) { if (blackListedFunctions.containsKey(scriptId)) { AtomicInteger errorCount = blackListedFunctions.get(scriptId); @@ -174,15 +200,46 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic } } - private Pair deduplicate(JsScriptType scriptType, String scriptBody) { - Pair precomputed = Pair.of(UUID.randomUUID(), new AtomicInteger()); - - Pair pair = scriptToId.computeIfAbsent(deduplicateKey(scriptType, scriptBody), i -> precomputed); - AtomicInteger duplicateCount = scriptIdToCount.computeIfAbsent(pair.getLeft(), i -> pair.getRight()); - return Pair.of(pair.getLeft(), duplicateCount); + private ScriptInfo deduplicate(JsScriptType scriptType, String scriptBody) { + ScriptInfo meta = ScriptInfo.preInit(); + String key = deduplicateKey(scriptType, scriptBody); + ScriptInfo latestMeta = scriptKeyToInfo.computeIfAbsent(key, i -> meta); + return scriptIdToInfo.computeIfAbsent(latestMeta.getId(), i -> latestMeta); } private String deduplicateKey(JsScriptType scriptType, String scriptBody) { return scriptType + "_" + scriptBody; } + + @Getter + private static class ScriptInfo { + private final UUID id; + private final Object lock; + private final AtomicInteger count; + + ScriptInfo(UUID id, Object lock, AtomicInteger count) { + this.id = id; + this.lock = lock; + this.count = count; + } + + static ScriptInfo preInit() { + UUID preId = UUID.randomUUID(); + AtomicInteger preCount = new AtomicInteger(); + Object preLock = new Object(); + return new ScriptInfo(preId, preLock, preCount); + } + } + + @EqualsAndHashCode + @Getter + private static class BlackListKey { + private final UUID scriptId; + private final EntityId entityId; + + public BlackListKey(UUID scriptId, EntityId entityId) { + this.scriptId = scriptId; + this.entityId = entityId; + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/script/JsSandboxService.java b/application/src/main/java/org/thingsboard/server/service/script/JsSandboxService.java index ee86c62a25..5e1c676443 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/JsSandboxService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/JsSandboxService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.script; import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.data.id.EntityId; import java.util.UUID; @@ -24,8 +25,8 @@ public interface JsSandboxService { ListenableFuture eval(JsScriptType scriptType, String scriptBody, String... argNames); - ListenableFuture invokeFunction(UUID scriptId, Object... args); + ListenableFuture invokeFunction(UUID scriptId, EntityId entityId, Object... args); - ListenableFuture release(UUID scriptId); + ListenableFuture release(UUID scriptId, EntityId entityId); } diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java index 2ba87ec8da..f2b5fd7c12 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -39,9 +40,11 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S private final JsSandboxService sandboxService; private final UUID scriptId; + private final EntityId entityId; - public RuleNodeJsScriptEngine(JsSandboxService sandboxService, String script, String... argNames) { + public RuleNodeJsScriptEngine(JsSandboxService sandboxService, EntityId entityId, String script, String... argNames) { this.sandboxService = sandboxService; + this.entityId = entityId; try { this.scriptId = this.sandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, script, argNames).get(); } catch (Exception e) { @@ -162,7 +165,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S private JsonNode executeScript(TbMsg msg) throws ScriptException { try { String[] inArgs = prepareArgs(msg); - String eval = sandboxService.invokeFunction(this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString(); + String eval = sandboxService.invokeFunction(this.scriptId, this.entityId, inArgs[0], inArgs[1], inArgs[2]).get().toString(); return mapper.readTree(eval); } catch (ExecutionException e) { if (e.getCause() instanceof ScriptException) { @@ -176,6 +179,6 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S } public void destroy() { - sandboxService.release(this.scriptId); + sandboxService.release(this.scriptId, this.entityId); } } diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index 978a570e66..dcfc9301b2 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -17,7 +17,7 @@ --> - + diff --git a/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java b/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java index ea7038442d..88961dc256 100644 --- a/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java +++ b/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java @@ -21,12 +21,18 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.thingsboard.rule.engine.api.ScriptEngine; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import javax.script.ScriptException; - +import java.util.Map; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.*; @@ -35,6 +41,8 @@ public class RuleNodeJsScriptEngineTest { private ScriptEngine scriptEngine; private TestNashornJsSandboxService jsSandboxService; + private EntityId ruleNodeId = new RuleNodeId(UUIDs.timeBased()); + @Before public void beforeTest() throws Exception { jsSandboxService = new TestNashornJsSandboxService(false, 1, 100, 3); @@ -48,7 +56,7 @@ public class RuleNodeJsScriptEngineTest { @Test public void msgCanBeUpdated() throws ScriptException { String function = "metadata.temp = metadata.temp * 10; return {metadata: metadata};"; - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "7"); @@ -65,7 +73,7 @@ public class RuleNodeJsScriptEngineTest { @Test public void newAttributesCanBeAddedInMsg() throws ScriptException { String function = "metadata.newAttr = metadata.humidity - msg.passed; return {metadata: metadata};"; - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "7"); metaData.putValue("humidity", "99"); @@ -81,7 +89,7 @@ public class RuleNodeJsScriptEngineTest { @Test public void payloadCanBeUpdated() throws ScriptException { String function = "msg.passed = msg.passed * metadata.temp; msg.bigObj.newProp = 'Ukraine'; return {msg: msg};"; - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "7"); metaData.putValue("humidity", "99"); @@ -99,7 +107,7 @@ public class RuleNodeJsScriptEngineTest { @Test public void metadataAccessibleForFilter() throws ScriptException { String function = "return metadata.humidity < 15;"; - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "7"); metaData.putValue("humidity", "99"); @@ -113,7 +121,7 @@ public class RuleNodeJsScriptEngineTest { @Test public void dataAccessibleForFilter() throws ScriptException { String function = "return msg.passed < 15 && msg.name === 'Vit' && metadata.temp == 7 && msg.bigObj.prop == 42;"; - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "7"); metaData.putValue("humidity", "99"); @@ -134,7 +142,7 @@ public class RuleNodeJsScriptEngineTest { "};\n" + "\n" + "return nextRelation(metadata, msg);"; - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, jsCode); + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, jsCode); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "10"); metaData.putValue("humidity", "99"); @@ -156,7 +164,7 @@ public class RuleNodeJsScriptEngineTest { "};\n" + "\n" + "return nextRelation(metadata, msg);"; - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, jsCode); + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, jsCode); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "10"); metaData.putValue("humidity", "99"); @@ -168,4 +176,75 @@ public class RuleNodeJsScriptEngineTest { scriptEngine.destroy(); } + @Test + public void concurrentReleasedCorrectly() throws InterruptedException, ExecutionException { + String code = "metadata.temp = metadata.temp * 10; return {metadata: metadata};"; + + int repeat = 1000; + ExecutorService service = Executors.newFixedThreadPool(repeat); + Map scriptIds = new ConcurrentHashMap<>(); + CountDownLatch startLatch = new CountDownLatch(repeat); + CountDownLatch finishLatch = new CountDownLatch(repeat); + AtomicInteger failedCount = new AtomicInteger(0); + + for (int i = 0; i < repeat; i++) { + service.submit(() -> runScript(startLatch, finishLatch, failedCount, scriptIds, code)); + } + + finishLatch.await(); + assertTrue(scriptIds.size() == 1); + assertTrue(failedCount.get() == 0); + + CountDownLatch nextStart = new CountDownLatch(repeat); + CountDownLatch nextFinish = new CountDownLatch(repeat); + for (int i = 0; i < repeat; i++) { + service.submit(() -> runScript(nextStart, nextFinish, failedCount, scriptIds, code)); + } + + nextFinish.await(); + assertTrue(scriptIds.size() == 1); + assertTrue(failedCount.get() == 0); + service.shutdownNow(); + } + + @Test + public void concurrentFailedEvaluationShouldThrowException() throws InterruptedException { + String code = "metadata.temp = metadata.temp * 10; urn {metadata: metadata};"; + + int repeat = 10000; + ExecutorService service = Executors.newFixedThreadPool(repeat); + Map scriptIds = new ConcurrentHashMap<>(); + CountDownLatch startLatch = new CountDownLatch(repeat); + CountDownLatch finishLatch = new CountDownLatch(repeat); + AtomicInteger failedCount = new AtomicInteger(0); + for (int i = 0; i < repeat; i++) { + service.submit(() -> { + service.submit(() -> runScript(startLatch, finishLatch, failedCount, scriptIds, code)); + }); + } + + finishLatch.await(); + assertTrue(scriptIds.isEmpty()); + assertEquals(repeat, failedCount.get()); + service.shutdownNow(); + } + + private void runScript(CountDownLatch startLatch, CountDownLatch finishLatch, AtomicInteger failedCount, + Map scriptIds, String code) { + try { + for (int k = 0; k < 10; k++) { + startLatch.countDown(); + startLatch.await(); + UUID scriptId = jsSandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, code).get(); + scriptIds.put(scriptId, new Object()); + jsSandboxService.invokeFunction(scriptId, ruleNodeId, "{}", "{}", "TEXT").get(); + jsSandboxService.release(scriptId, ruleNodeId).get(); + } + } catch (Throwable th) { + failedCount.incrementAndGet(); + } finally { + finishLatch.countDown(); + } + } + } \ No newline at end of file