sync deduplicated script evaluation/release

This commit is contained in:
vparomskiy 2018-09-11 14:36:24 +03:00
parent 4ba34513a4
commit 831f6201db
7 changed files with 206 additions and 66 deletions

View File

@ -154,7 +154,7 @@ class DefaultTbContext implements TbContext {
@Override @Override
public ScriptEngine createJsScriptEngine(String script, String... argNames) { public ScriptEngine createJsScriptEngine(String script, String... argNames) {
return new RuleNodeJsScriptEngine(mainCtx.getJsSandbox(), script, argNames); return new RuleNodeJsScriptEngine(mainCtx.getJsSandbox(), nodeCtx.getSelf().getId(), script, argNames);
} }
@Override @Override

View File

@ -276,7 +276,7 @@ public class RuleChainController extends BaseController {
String errorText = ""; String errorText = "";
ScriptEngine engine = null; ScriptEngine engine = null;
try { try {
engine = new RuleNodeJsScriptEngine(jsSandboxService, script, argNames); engine = new RuleNodeJsScriptEngine(jsSandboxService, getCurrentUser().getId(), script, argNames);
TbMsg inMsg = new TbMsg(UUIDs.timeBased(), msgType, null, new TbMsgMetaData(metadata), data, null, null, 0L); TbMsg inMsg = new TbMsg(UUIDs.timeBased(), msgType, null, new TbMsgMetaData(metadata), data, null, null, 0L);
switch (scriptType) { switch (scriptType) {
case "update": case "update":

View File

@ -13,16 +13,19 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.service.script; package org.thingsboard.server.service.script;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import delight.nashornsandbox.NashornSandbox; import delight.nashornsandbox.NashornSandbox;
import delight.nashornsandbox.NashornSandboxes; import delight.nashornsandbox.NashornSandboxes;
import jdk.nashorn.api.scripting.NashornScriptEngineFactory; import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair; import org.thingsboard.server.common.data.id.EntityId;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
@ -42,17 +45,20 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
private NashornSandbox sandbox; private NashornSandbox sandbox;
private ScriptEngine engine; private ScriptEngine engine;
private ExecutorService monitorExecutorService; private ExecutorService monitorExecutorService;
private ListeningExecutorService evalExecutorService;
private final Map<UUID, String> functionsMap = new ConcurrentHashMap<>(); private final Map<UUID, String> functionsMap = new ConcurrentHashMap<>();
private final Map<UUID,AtomicInteger> blackListedFunctions = new ConcurrentHashMap<>(); private final Map<BlackListKey, AtomicInteger> blackListedFunctions = new ConcurrentHashMap<>();
private final Map<String, Pair<UUID, AtomicInteger>> scriptToId = new ConcurrentHashMap<>();
private final Map<UUID, AtomicInteger> scriptIdToCount = new ConcurrentHashMap<>(); private final Map<String, ScriptInfo> scriptKeyToInfo = new ConcurrentHashMap<>();
private final Map<UUID, ScriptInfo> scriptIdToInfo = new ConcurrentHashMap<>();
@PostConstruct @PostConstruct
public void init() { public void init() {
if (useJsSandbox()) { if (useJsSandbox()) {
sandbox = NashornSandboxes.create(); sandbox = NashornSandboxes.create();
monitorExecutorService = Executors.newFixedThreadPool(getMonitorThreadPoolSize()); monitorExecutorService = Executors.newFixedThreadPool(getMonitorThreadPoolSize());
evalExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
sandbox.setExecutor(monitorExecutorService); sandbox.setExecutor(monitorExecutorService);
sandbox.setMaxCPUTime(getMaxCpuTime()); sandbox.setMaxCPUTime(getMaxCpuTime());
sandbox.allowNoBraces(false); sandbox.allowNoBraces(false);
@ -68,6 +74,9 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
if (monitorExecutorService != null) { if (monitorExecutorService != null) {
monitorExecutorService.shutdownNow(); monitorExecutorService.shutdownNow();
} }
if (evalExecutorService != null) {
evalExecutorService.shutdownNow();
}
} }
protected abstract boolean useJsSandbox(); protected abstract boolean useJsSandbox();
@ -80,20 +89,14 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
@Override @Override
public ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames) { public ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames) {
Pair<UUID, AtomicInteger> deduplicated = deduplicate(scriptType, scriptBody); ScriptInfo scriptInfo = deduplicate(scriptType, scriptBody);
UUID scriptId = deduplicated.getLeft(); UUID scriptId = scriptInfo.getId();
AtomicInteger duplicateCount = deduplicated.getRight(); AtomicInteger duplicateCount = scriptInfo.getCount();
synchronized (scriptInfo.getLock()) {
if (duplicateCount.compareAndSet(0, 1)) { if (duplicateCount.compareAndSet(0, 1)) {
String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
try { try {
if (useJsSandbox()) { evaluate(scriptId, scriptType, scriptBody, argNames);
sandbox.eval(jsScript);
} else {
engine.eval(jsScript);
}
functionsMap.put(scriptId, functionName);
} catch (Exception e) { } catch (Exception e) {
duplicateCount.decrementAndGet(); duplicateCount.decrementAndGet();
log.warn("Failed to compile JS script: {}", e.getMessage(), e); log.warn("Failed to compile JS script: {}", e.getMessage(), e);
@ -102,11 +105,23 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
} else { } else {
duplicateCount.incrementAndGet(); duplicateCount.incrementAndGet();
} }
}
return Futures.immediateFuture(scriptId); return Futures.immediateFuture(scriptId);
} }
private void evaluate(UUID scriptId, JsScriptType scriptType, String scriptBody, String... argNames) throws ScriptException {
String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
if (useJsSandbox()) {
sandbox.eval(jsScript);
} else {
engine.eval(jsScript);
}
functionsMap.put(scriptId, functionName);
}
@Override @Override
public ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args) { public ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args) {
String functionName = functionsMap.get(scriptId); String functionName = functionsMap.get(scriptId);
if (functionName == null) { if (functionName == null) {
return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!")); return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!"));
@ -121,7 +136,8 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
} }
return Futures.immediateFuture(result); return Futures.immediateFuture(result);
} catch (Exception e) { } catch (Exception e) {
blackListedFunctions.computeIfAbsent(scriptId, key -> new AtomicInteger(0)).incrementAndGet(); BlackListKey blackListKey = new BlackListKey(scriptId, entityId);
blackListedFunctions.computeIfAbsent(blackListKey, key -> new AtomicInteger(0)).incrementAndGet();
return Futures.immediateFailedFuture(e); return Futures.immediateFailedFuture(e);
} }
} else { } else {
@ -131,12 +147,17 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
} }
@Override @Override
public ListenableFuture<Void> release(UUID scriptId) { public ListenableFuture<Void> release(UUID scriptId, EntityId entityId) {
AtomicInteger count = scriptIdToCount.get(scriptId); ScriptInfo scriptInfo = scriptIdToInfo.get(scriptId);
if(count != null) { if (scriptInfo == null) {
if(count.decrementAndGet() > 0) { log.warn("Script release called for not existing script id [{}]", scriptId);
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }
synchronized (scriptInfo.getLock()) {
int remainingDuplicates = scriptInfo.getCount().decrementAndGet();
if (remainingDuplicates > 0) {
return Futures.immediateFuture(null);
} }
String functionName = functionsMap.get(scriptId); String functionName = functionsMap.get(scriptId);
@ -148,14 +169,19 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
engine.eval(functionName + " = undefined;"); engine.eval(functionName + " = undefined;");
} }
functionsMap.remove(scriptId); functionsMap.remove(scriptId);
blackListedFunctions.remove(scriptId); blackListedFunctions.remove(new BlackListKey(scriptId, entityId));
} catch (ScriptException e) { } catch (ScriptException e) {
log.error("Could not release script [{}] [{}]", scriptId, remainingDuplicates);
return Futures.immediateFailedFuture(e); return Futures.immediateFailedFuture(e);
} }
} else {
log.warn("Function name do not exist for script [{}] [{}]", scriptId, remainingDuplicates);
}
} }
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }
private boolean isBlackListed(UUID scriptId) { private boolean isBlackListed(UUID scriptId) {
if (blackListedFunctions.containsKey(scriptId)) { if (blackListedFunctions.containsKey(scriptId)) {
AtomicInteger errorCount = blackListedFunctions.get(scriptId); AtomicInteger errorCount = blackListedFunctions.get(scriptId);
@ -174,15 +200,46 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
} }
} }
private Pair<UUID, AtomicInteger> deduplicate(JsScriptType scriptType, String scriptBody) { private ScriptInfo deduplicate(JsScriptType scriptType, String scriptBody) {
Pair<UUID, AtomicInteger> precomputed = Pair.of(UUID.randomUUID(), new AtomicInteger()); ScriptInfo meta = ScriptInfo.preInit();
String key = deduplicateKey(scriptType, scriptBody);
Pair<UUID, AtomicInteger> pair = scriptToId.computeIfAbsent(deduplicateKey(scriptType, scriptBody), i -> precomputed); ScriptInfo latestMeta = scriptKeyToInfo.computeIfAbsent(key, i -> meta);
AtomicInteger duplicateCount = scriptIdToCount.computeIfAbsent(pair.getLeft(), i -> pair.getRight()); return scriptIdToInfo.computeIfAbsent(latestMeta.getId(), i -> latestMeta);
return Pair.of(pair.getLeft(), duplicateCount);
} }
private String deduplicateKey(JsScriptType scriptType, String scriptBody) { private String deduplicateKey(JsScriptType scriptType, String scriptBody) {
return scriptType + "_" + scriptBody; return scriptType + "_" + scriptBody;
} }
@Getter
private static class ScriptInfo {
private final UUID id;
private final Object lock;
private final AtomicInteger count;
ScriptInfo(UUID id, Object lock, AtomicInteger count) {
this.id = id;
this.lock = lock;
this.count = count;
}
static ScriptInfo preInit() {
UUID preId = UUID.randomUUID();
AtomicInteger preCount = new AtomicInteger();
Object preLock = new Object();
return new ScriptInfo(preId, preLock, preCount);
}
}
@EqualsAndHashCode
@Getter
private static class BlackListKey {
private final UUID scriptId;
private final EntityId entityId;
public BlackListKey(UUID scriptId, EntityId entityId) {
this.scriptId = scriptId;
this.entityId = entityId;
}
}
} }

View File

@ -17,6 +17,7 @@
package org.thingsboard.server.service.script; package org.thingsboard.server.service.script;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.id.EntityId;
import java.util.UUID; import java.util.UUID;
@ -24,8 +25,8 @@ public interface JsSandboxService {
ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames); ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames);
ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args); ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args);
ListenableFuture<Void> release(UUID scriptId); ListenableFuture<Void> release(UUID scriptId, EntityId entityId);
} }

View File

@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
@ -39,9 +40,11 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
private final JsSandboxService sandboxService; private final JsSandboxService sandboxService;
private final UUID scriptId; private final UUID scriptId;
private final EntityId entityId;
public RuleNodeJsScriptEngine(JsSandboxService sandboxService, String script, String... argNames) { public RuleNodeJsScriptEngine(JsSandboxService sandboxService, EntityId entityId, String script, String... argNames) {
this.sandboxService = sandboxService; this.sandboxService = sandboxService;
this.entityId = entityId;
try { try {
this.scriptId = this.sandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, script, argNames).get(); this.scriptId = this.sandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, script, argNames).get();
} catch (Exception e) { } catch (Exception e) {
@ -162,7 +165,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
private JsonNode executeScript(TbMsg msg) throws ScriptException { private JsonNode executeScript(TbMsg msg) throws ScriptException {
try { try {
String[] inArgs = prepareArgs(msg); String[] inArgs = prepareArgs(msg);
String eval = sandboxService.invokeFunction(this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString(); String eval = sandboxService.invokeFunction(this.scriptId, this.entityId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
return mapper.readTree(eval); return mapper.readTree(eval);
} catch (ExecutionException e) { } catch (ExecutionException e) {
if (e.getCause() instanceof ScriptException) { if (e.getCause() instanceof ScriptException) {
@ -176,6 +179,6 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
} }
public void destroy() { public void destroy() {
sandboxService.release(this.scriptId); sandboxService.release(this.scriptId, this.entityId);
} }
} }

View File

@ -17,7 +17,7 @@
--> -->
<!DOCTYPE configuration> <!DOCTYPE configuration>
<configuration> <configuration scan="true" scanPeriod="10 seconds">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder> <encoder>

View File

@ -21,12 +21,18 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.thingsboard.rule.engine.api.ScriptEngine; import org.thingsboard.rule.engine.api.ScriptEngine;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import javax.script.ScriptException; import javax.script.ScriptException;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -35,6 +41,8 @@ public class RuleNodeJsScriptEngineTest {
private ScriptEngine scriptEngine; private ScriptEngine scriptEngine;
private TestNashornJsSandboxService jsSandboxService; private TestNashornJsSandboxService jsSandboxService;
private EntityId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
@Before @Before
public void beforeTest() throws Exception { public void beforeTest() throws Exception {
jsSandboxService = new TestNashornJsSandboxService(false, 1, 100, 3); jsSandboxService = new TestNashornJsSandboxService(false, 1, 100, 3);
@ -48,7 +56,7 @@ public class RuleNodeJsScriptEngineTest {
@Test @Test
public void msgCanBeUpdated() throws ScriptException { public void msgCanBeUpdated() throws ScriptException {
String function = "metadata.temp = metadata.temp * 10; return {metadata: metadata};"; String function = "metadata.temp = metadata.temp * 10; return {metadata: metadata};";
scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7"); metaData.putValue("temp", "7");
@ -65,7 +73,7 @@ public class RuleNodeJsScriptEngineTest {
@Test @Test
public void newAttributesCanBeAddedInMsg() throws ScriptException { public void newAttributesCanBeAddedInMsg() throws ScriptException {
String function = "metadata.newAttr = metadata.humidity - msg.passed; return {metadata: metadata};"; String function = "metadata.newAttr = metadata.humidity - msg.passed; return {metadata: metadata};";
scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7"); metaData.putValue("temp", "7");
metaData.putValue("humidity", "99"); metaData.putValue("humidity", "99");
@ -81,7 +89,7 @@ public class RuleNodeJsScriptEngineTest {
@Test @Test
public void payloadCanBeUpdated() throws ScriptException { public void payloadCanBeUpdated() throws ScriptException {
String function = "msg.passed = msg.passed * metadata.temp; msg.bigObj.newProp = 'Ukraine'; return {msg: msg};"; String function = "msg.passed = msg.passed * metadata.temp; msg.bigObj.newProp = 'Ukraine'; return {msg: msg};";
scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7"); metaData.putValue("temp", "7");
metaData.putValue("humidity", "99"); metaData.putValue("humidity", "99");
@ -99,7 +107,7 @@ public class RuleNodeJsScriptEngineTest {
@Test @Test
public void metadataAccessibleForFilter() throws ScriptException { public void metadataAccessibleForFilter() throws ScriptException {
String function = "return metadata.humidity < 15;"; String function = "return metadata.humidity < 15;";
scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7"); metaData.putValue("temp", "7");
metaData.putValue("humidity", "99"); metaData.putValue("humidity", "99");
@ -113,7 +121,7 @@ public class RuleNodeJsScriptEngineTest {
@Test @Test
public void dataAccessibleForFilter() throws ScriptException { public void dataAccessibleForFilter() throws ScriptException {
String function = "return msg.passed < 15 && msg.name === 'Vit' && metadata.temp == 7 && msg.bigObj.prop == 42;"; String function = "return msg.passed < 15 && msg.name === 'Vit' && metadata.temp == 7 && msg.bigObj.prop == 42;";
scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "7"); metaData.putValue("temp", "7");
metaData.putValue("humidity", "99"); metaData.putValue("humidity", "99");
@ -134,7 +142,7 @@ public class RuleNodeJsScriptEngineTest {
"};\n" + "};\n" +
"\n" + "\n" +
"return nextRelation(metadata, msg);"; "return nextRelation(metadata, msg);";
scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, jsCode); scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, jsCode);
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "10"); metaData.putValue("temp", "10");
metaData.putValue("humidity", "99"); metaData.putValue("humidity", "99");
@ -156,7 +164,7 @@ public class RuleNodeJsScriptEngineTest {
"};\n" + "};\n" +
"\n" + "\n" +
"return nextRelation(metadata, msg);"; "return nextRelation(metadata, msg);";
scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, jsCode); scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, jsCode);
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("temp", "10"); metaData.putValue("temp", "10");
metaData.putValue("humidity", "99"); metaData.putValue("humidity", "99");
@ -168,4 +176,75 @@ public class RuleNodeJsScriptEngineTest {
scriptEngine.destroy(); scriptEngine.destroy();
} }
@Test
public void concurrentReleasedCorrectly() throws InterruptedException, ExecutionException {
String code = "metadata.temp = metadata.temp * 10; return {metadata: metadata};";
int repeat = 1000;
ExecutorService service = Executors.newFixedThreadPool(repeat);
Map<UUID, Object> scriptIds = new ConcurrentHashMap<>();
CountDownLatch startLatch = new CountDownLatch(repeat);
CountDownLatch finishLatch = new CountDownLatch(repeat);
AtomicInteger failedCount = new AtomicInteger(0);
for (int i = 0; i < repeat; i++) {
service.submit(() -> runScript(startLatch, finishLatch, failedCount, scriptIds, code));
}
finishLatch.await();
assertTrue(scriptIds.size() == 1);
assertTrue(failedCount.get() == 0);
CountDownLatch nextStart = new CountDownLatch(repeat);
CountDownLatch nextFinish = new CountDownLatch(repeat);
for (int i = 0; i < repeat; i++) {
service.submit(() -> runScript(nextStart, nextFinish, failedCount, scriptIds, code));
}
nextFinish.await();
assertTrue(scriptIds.size() == 1);
assertTrue(failedCount.get() == 0);
service.shutdownNow();
}
@Test
public void concurrentFailedEvaluationShouldThrowException() throws InterruptedException {
String code = "metadata.temp = metadata.temp * 10; urn {metadata: metadata};";
int repeat = 10000;
ExecutorService service = Executors.newFixedThreadPool(repeat);
Map<UUID, Object> scriptIds = new ConcurrentHashMap<>();
CountDownLatch startLatch = new CountDownLatch(repeat);
CountDownLatch finishLatch = new CountDownLatch(repeat);
AtomicInteger failedCount = new AtomicInteger(0);
for (int i = 0; i < repeat; i++) {
service.submit(() -> {
service.submit(() -> runScript(startLatch, finishLatch, failedCount, scriptIds, code));
});
}
finishLatch.await();
assertTrue(scriptIds.isEmpty());
assertEquals(repeat, failedCount.get());
service.shutdownNow();
}
private void runScript(CountDownLatch startLatch, CountDownLatch finishLatch, AtomicInteger failedCount,
Map<UUID, Object> scriptIds, String code) {
try {
for (int k = 0; k < 10; k++) {
startLatch.countDown();
startLatch.await();
UUID scriptId = jsSandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, code).get();
scriptIds.put(scriptId, new Object());
jsSandboxService.invokeFunction(scriptId, ruleNodeId, "{}", "{}", "TEXT").get();
jsSandboxService.release(scriptId, ruleNodeId).get();
}
} catch (Throwable th) {
failedCount.incrementAndGet();
} finally {
finishLatch.countDown();
}
}
} }