diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 7279347b1c..d5260da10b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -154,7 +154,7 @@ class DefaultTbContext implements TbContext { @Override public ScriptEngine createJsScriptEngine(String script, String... argNames) { - return new RuleNodeJsScriptEngine(mainCtx.getJsSandbox(), script, argNames); + return new RuleNodeJsScriptEngine(mainCtx.getJsSandbox(), nodeCtx.getSelf().getId(), script, argNames); } @Override diff --git a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java index 86b8fdacd7..82ab1702fa 100644 --- a/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java +++ b/application/src/main/java/org/thingsboard/server/controller/RuleChainController.java @@ -276,7 +276,7 @@ public class RuleChainController extends BaseController { String errorText = ""; ScriptEngine engine = null; try { - engine = new RuleNodeJsScriptEngine(jsSandboxService, script, argNames); + engine = new RuleNodeJsScriptEngine(jsSandboxService, getCurrentUser().getId(), script, argNames); TbMsg inMsg = new TbMsg(UUIDs.timeBased(), msgType, null, new TbMsgMetaData(metadata), data, null, null, 0L); switch (scriptType) { case "update": diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsSandboxService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsSandboxService.java index 724f652609..1d89c9d860 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsSandboxService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsSandboxService.java @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.thingsboard.server.service.script; import com.google.common.util.concurrent.Futures; @@ -21,8 +20,12 @@ import com.google.common.util.concurrent.ListenableFuture; import delight.nashornsandbox.NashornSandbox; import delight.nashornsandbox.NashornSandboxes; import jdk.nashorn.api.scripting.NashornScriptEngineFactory; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.tuple.Pair; +import org.thingsboard.server.common.data.id.EntityId; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -44,9 +47,10 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic private ExecutorService monitorExecutorService; private final Map functionsMap = new ConcurrentHashMap<>(); - private final Map blackListedFunctions = new ConcurrentHashMap<>(); - private final Map> scriptToId = new ConcurrentHashMap<>(); - private final Map scriptIdToCount = new ConcurrentHashMap<>(); + private final Map blackListedFunctions = new ConcurrentHashMap<>(); + + private final Map scriptKeyToInfo = new ConcurrentHashMap<>(); + private final Map scriptIdToInfo = new ConcurrentHashMap<>(); @PostConstruct public void init() { @@ -65,7 +69,7 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic @PreDestroy public void stop() { - if (monitorExecutorService != null) { + if (monitorExecutorService != null) { monitorExecutorService.shutdownNow(); } } @@ -80,90 +84,107 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic @Override public ListenableFuture eval(JsScriptType scriptType, String scriptBody, String... argNames) { - Pair deduplicated = deduplicate(scriptType, scriptBody); - UUID scriptId = deduplicated.getLeft(); - AtomicInteger duplicateCount = deduplicated.getRight(); + ScriptInfo scriptInfo = deduplicate(scriptType, scriptBody); + UUID scriptId = scriptInfo.getId(); + AtomicInteger duplicateCount = scriptInfo.getCount(); - if(duplicateCount.compareAndSet(0, 1)) { - String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_'); - String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames); - try { - if (useJsSandbox()) { - sandbox.eval(jsScript); - } else { - engine.eval(jsScript); + synchronized (scriptInfo.getLock()) { + if (duplicateCount.compareAndSet(0, 1)) { + try { + evaluate(scriptId, scriptType, scriptBody, argNames); + } catch (Exception e) { + duplicateCount.decrementAndGet(); + log.warn("Failed to compile JS script: {}", e.getMessage(), e); + return Futures.immediateFailedFuture(e); } - functionsMap.put(scriptId, functionName); - } catch (Exception e) { - duplicateCount.decrementAndGet(); - log.warn("Failed to compile JS script: {}", e.getMessage(), e); - return Futures.immediateFailedFuture(e); + } else { + duplicateCount.incrementAndGet(); } - } else { - duplicateCount.incrementAndGet(); } return Futures.immediateFuture(scriptId); } - @Override - public ListenableFuture invokeFunction(UUID scriptId, Object... args) { - String functionName = functionsMap.get(scriptId); - if (functionName == null) { - return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!")); - } - if (!isBlackListed(scriptId)) { - try { - Object result; - if (useJsSandbox()) { - result = sandbox.getSandboxedInvocable().invokeFunction(functionName, args); - } else { - result = ((Invocable)engine).invokeFunction(functionName, args); - } - return Futures.immediateFuture(result); - } catch (Exception e) { - blackListedFunctions.computeIfAbsent(scriptId, key -> new AtomicInteger(0)).incrementAndGet(); - return Futures.immediateFailedFuture(e); - } + private void evaluate(UUID scriptId, JsScriptType scriptType, String scriptBody, String... argNames) throws ScriptException { + String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_'); + String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames); + if (useJsSandbox()) { + sandbox.eval(jsScript); } else { - return Futures.immediateFailedFuture( - new RuntimeException("Script is blacklisted due to maximum error count " + getMaxErrors() + "!")); + engine.eval(jsScript); } + functionsMap.put(scriptId, functionName); } @Override - public ListenableFuture release(UUID scriptId) { - AtomicInteger count = scriptIdToCount.get(scriptId); - if(count != null) { - if(count.decrementAndGet() > 0) { - return Futures.immediateFuture(null); - } + public ListenableFuture invokeFunction(UUID scriptId, EntityId entityId, Object... args) { + String functionName = functionsMap.get(scriptId); + if (functionName == null) { + String message = "No compiled script found for scriptId: [" + scriptId + "]!"; + log.warn(message); + return Futures.immediateFailedFuture(new RuntimeException(message)); } - String functionName = functionsMap.get(scriptId); - if (functionName != null) { - try { - if (useJsSandbox()) { - sandbox.eval(functionName + " = undefined;"); - } else { - engine.eval(functionName + " = undefined;"); + BlackListInfo blackListInfo = blackListedFunctions.get(new BlackListKey(scriptId, entityId)); + if (blackListInfo != null && blackListInfo.getCount() >= getMaxErrors()) { + RuntimeException throwable = new RuntimeException("Script is blacklisted due to maximum error count " + getMaxErrors() + "!", blackListInfo.getCause()); + throwable.printStackTrace(); + return Futures.immediateFailedFuture(throwable); + } + + try { + return invoke(functionName, args); + } catch (Exception e) { + BlackListKey blackListKey = new BlackListKey(scriptId, entityId); + blackListedFunctions.computeIfAbsent(blackListKey, key -> new BlackListInfo()).incrementWithReason(e); + return Futures.immediateFailedFuture(e); + } + } + + private ListenableFuture invoke(String functionName, Object... args) throws ScriptException, NoSuchMethodException { + Object result; + if (useJsSandbox()) { + result = sandbox.getSandboxedInvocable().invokeFunction(functionName, args); + } else { + result = ((Invocable) engine).invokeFunction(functionName, args); + } + return Futures.immediateFuture(result); + } + + @Override + public ListenableFuture release(UUID scriptId, EntityId entityId) { + ScriptInfo scriptInfo = scriptIdToInfo.get(scriptId); + if (scriptInfo == null) { + log.warn("Script release called for not existing script id [{}]", scriptId); + return Futures.immediateFuture(null); + } + + synchronized (scriptInfo.getLock()) { + int remainingDuplicates = scriptInfo.getCount().decrementAndGet(); + if (remainingDuplicates > 0) { + return Futures.immediateFuture(null); + } + + String functionName = functionsMap.get(scriptId); + if (functionName != null) { + try { + if (useJsSandbox()) { + sandbox.eval(functionName + " = undefined;"); + } else { + engine.eval(functionName + " = undefined;"); + } + functionsMap.remove(scriptId); + blackListedFunctions.remove(new BlackListKey(scriptId, entityId)); + } catch (ScriptException e) { + log.error("Could not release script [{}] [{}]", scriptId, remainingDuplicates); + return Futures.immediateFailedFuture(e); } - functionsMap.remove(scriptId); - blackListedFunctions.remove(scriptId); - } catch (ScriptException e) { - return Futures.immediateFailedFuture(e); + } else { + log.warn("Function name do not exist for script [{}] [{}]", scriptId, remainingDuplicates); } } return Futures.immediateFuture(null); } - private boolean isBlackListed(UUID scriptId) { - if (blackListedFunctions.containsKey(scriptId)) { - AtomicInteger errorCount = blackListedFunctions.get(scriptId); - return errorCount.get() >= getMaxErrors(); - } else { - return false; - } - } private String generateJsScript(JsScriptType scriptType, String functionName, String scriptBody, String... argNames) { switch (scriptType) { @@ -174,15 +195,66 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic } } - private Pair deduplicate(JsScriptType scriptType, String scriptBody) { - Pair precomputed = Pair.of(UUID.randomUUID(), new AtomicInteger()); - - Pair pair = scriptToId.computeIfAbsent(deduplicateKey(scriptType, scriptBody), i -> precomputed); - AtomicInteger duplicateCount = scriptIdToCount.computeIfAbsent(pair.getLeft(), i -> pair.getRight()); - return Pair.of(pair.getLeft(), duplicateCount); + private ScriptInfo deduplicate(JsScriptType scriptType, String scriptBody) { + ScriptInfo meta = ScriptInfo.preInit(); + String key = deduplicateKey(scriptType, scriptBody); + ScriptInfo latestMeta = scriptKeyToInfo.computeIfAbsent(key, i -> meta); + return scriptIdToInfo.computeIfAbsent(latestMeta.getId(), i -> latestMeta); } private String deduplicateKey(JsScriptType scriptType, String scriptBody) { return scriptType + "_" + scriptBody; } + + @Getter + private static class ScriptInfo { + private final UUID id; + private final Object lock; + private final AtomicInteger count; + + ScriptInfo(UUID id, Object lock, AtomicInteger count) { + this.id = id; + this.lock = lock; + this.count = count; + } + + static ScriptInfo preInit() { + UUID preId = UUID.randomUUID(); + AtomicInteger preCount = new AtomicInteger(); + Object preLock = new Object(); + return new ScriptInfo(preId, preLock, preCount); + } + } + + @EqualsAndHashCode + @Getter + @RequiredArgsConstructor + private static class BlackListKey { + private final UUID scriptId; + private final EntityId entityId; + + } + + @Data + private static class BlackListInfo { + private final AtomicInteger count; + private Exception ex; + + BlackListInfo() { + this.count = new AtomicInteger(0); + } + + void incrementWithReason(Exception e) { + count.incrementAndGet(); + ex = e; + } + + int getCount() { + return count.get(); + } + + Exception getCause() { + return ex; + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/script/JsSandboxService.java b/application/src/main/java/org/thingsboard/server/service/script/JsSandboxService.java index ee86c62a25..5e1c676443 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/JsSandboxService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/JsSandboxService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.script; import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.data.id.EntityId; import java.util.UUID; @@ -24,8 +25,8 @@ public interface JsSandboxService { ListenableFuture eval(JsScriptType scriptType, String scriptBody, String... argNames); - ListenableFuture invokeFunction(UUID scriptId, Object... args); + ListenableFuture invokeFunction(UUID scriptId, EntityId entityId, Object... args); - ListenableFuture release(UUID scriptId); + ListenableFuture release(UUID scriptId, EntityId entityId); } diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java index 2ba87ec8da..767dc05a23 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -39,9 +40,11 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S private final JsSandboxService sandboxService; private final UUID scriptId; + private final EntityId entityId; - public RuleNodeJsScriptEngine(JsSandboxService sandboxService, String script, String... argNames) { + public RuleNodeJsScriptEngine(JsSandboxService sandboxService, EntityId entityId, String script, String... argNames) { this.sandboxService = sandboxService; + this.entityId = entityId; try { this.scriptId = this.sandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, script, argNames).get(); } catch (Exception e) { @@ -162,20 +165,20 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S private JsonNode executeScript(TbMsg msg) throws ScriptException { try { String[] inArgs = prepareArgs(msg); - String eval = sandboxService.invokeFunction(this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString(); + String eval = sandboxService.invokeFunction(this.scriptId, this.entityId, inArgs[0], inArgs[1], inArgs[2]).get().toString(); return mapper.readTree(eval); } catch (ExecutionException e) { if (e.getCause() instanceof ScriptException) { throw (ScriptException)e.getCause(); } else { - throw new ScriptException("Failed to execute js script: " + e.getMessage()); + throw new ScriptException(e); } } catch (Exception e) { - throw new ScriptException("Failed to execute js script: " + e.getMessage()); + throw new ScriptException(e); } } public void destroy() { - sandboxService.release(this.scriptId); + sandboxService.release(this.scriptId, this.entityId); } } diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index 978a570e66..dcfc9301b2 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -17,7 +17,7 @@ --> - + diff --git a/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java b/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java index ea7038442d..88961dc256 100644 --- a/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java +++ b/application/src/test/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngineTest.java @@ -21,12 +21,18 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.thingsboard.rule.engine.api.ScriptEngine; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import javax.script.ScriptException; - +import java.util.Map; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.*; @@ -35,6 +41,8 @@ public class RuleNodeJsScriptEngineTest { private ScriptEngine scriptEngine; private TestNashornJsSandboxService jsSandboxService; + private EntityId ruleNodeId = new RuleNodeId(UUIDs.timeBased()); + @Before public void beforeTest() throws Exception { jsSandboxService = new TestNashornJsSandboxService(false, 1, 100, 3); @@ -48,7 +56,7 @@ public class RuleNodeJsScriptEngineTest { @Test public void msgCanBeUpdated() throws ScriptException { String function = "metadata.temp = metadata.temp * 10; return {metadata: metadata};"; - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "7"); @@ -65,7 +73,7 @@ public class RuleNodeJsScriptEngineTest { @Test public void newAttributesCanBeAddedInMsg() throws ScriptException { String function = "metadata.newAttr = metadata.humidity - msg.passed; return {metadata: metadata};"; - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "7"); metaData.putValue("humidity", "99"); @@ -81,7 +89,7 @@ public class RuleNodeJsScriptEngineTest { @Test public void payloadCanBeUpdated() throws ScriptException { String function = "msg.passed = msg.passed * metadata.temp; msg.bigObj.newProp = 'Ukraine'; return {msg: msg};"; - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "7"); metaData.putValue("humidity", "99"); @@ -99,7 +107,7 @@ public class RuleNodeJsScriptEngineTest { @Test public void metadataAccessibleForFilter() throws ScriptException { String function = "return metadata.humidity < 15;"; - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "7"); metaData.putValue("humidity", "99"); @@ -113,7 +121,7 @@ public class RuleNodeJsScriptEngineTest { @Test public void dataAccessibleForFilter() throws ScriptException { String function = "return msg.passed < 15 && msg.name === 'Vit' && metadata.temp == 7 && msg.bigObj.prop == 42;"; - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function); + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "7"); metaData.putValue("humidity", "99"); @@ -134,7 +142,7 @@ public class RuleNodeJsScriptEngineTest { "};\n" + "\n" + "return nextRelation(metadata, msg);"; - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, jsCode); + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, jsCode); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "10"); metaData.putValue("humidity", "99"); @@ -156,7 +164,7 @@ public class RuleNodeJsScriptEngineTest { "};\n" + "\n" + "return nextRelation(metadata, msg);"; - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, jsCode); + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, jsCode); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("temp", "10"); metaData.putValue("humidity", "99"); @@ -168,4 +176,75 @@ public class RuleNodeJsScriptEngineTest { scriptEngine.destroy(); } + @Test + public void concurrentReleasedCorrectly() throws InterruptedException, ExecutionException { + String code = "metadata.temp = metadata.temp * 10; return {metadata: metadata};"; + + int repeat = 1000; + ExecutorService service = Executors.newFixedThreadPool(repeat); + Map scriptIds = new ConcurrentHashMap<>(); + CountDownLatch startLatch = new CountDownLatch(repeat); + CountDownLatch finishLatch = new CountDownLatch(repeat); + AtomicInteger failedCount = new AtomicInteger(0); + + for (int i = 0; i < repeat; i++) { + service.submit(() -> runScript(startLatch, finishLatch, failedCount, scriptIds, code)); + } + + finishLatch.await(); + assertTrue(scriptIds.size() == 1); + assertTrue(failedCount.get() == 0); + + CountDownLatch nextStart = new CountDownLatch(repeat); + CountDownLatch nextFinish = new CountDownLatch(repeat); + for (int i = 0; i < repeat; i++) { + service.submit(() -> runScript(nextStart, nextFinish, failedCount, scriptIds, code)); + } + + nextFinish.await(); + assertTrue(scriptIds.size() == 1); + assertTrue(failedCount.get() == 0); + service.shutdownNow(); + } + + @Test + public void concurrentFailedEvaluationShouldThrowException() throws InterruptedException { + String code = "metadata.temp = metadata.temp * 10; urn {metadata: metadata};"; + + int repeat = 10000; + ExecutorService service = Executors.newFixedThreadPool(repeat); + Map scriptIds = new ConcurrentHashMap<>(); + CountDownLatch startLatch = new CountDownLatch(repeat); + CountDownLatch finishLatch = new CountDownLatch(repeat); + AtomicInteger failedCount = new AtomicInteger(0); + for (int i = 0; i < repeat; i++) { + service.submit(() -> { + service.submit(() -> runScript(startLatch, finishLatch, failedCount, scriptIds, code)); + }); + } + + finishLatch.await(); + assertTrue(scriptIds.isEmpty()); + assertEquals(repeat, failedCount.get()); + service.shutdownNow(); + } + + private void runScript(CountDownLatch startLatch, CountDownLatch finishLatch, AtomicInteger failedCount, + Map scriptIds, String code) { + try { + for (int k = 0; k < 10; k++) { + startLatch.countDown(); + startLatch.await(); + UUID scriptId = jsSandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, code).get(); + scriptIds.put(scriptId, new Object()); + jsSandboxService.invokeFunction(scriptId, ruleNodeId, "{}", "{}", "TEXT").get(); + jsSandboxService.release(scriptId, ruleNodeId).get(); + } + } catch (Throwable th) { + failedCount.incrementAndGet(); + } finally { + finishLatch.countDown(); + } + } + } \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java index fe35ef6b35..260669f9ff 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java @@ -26,10 +26,8 @@ import org.thingsboard.rule.engine.api.*; import org.thingsboard.rule.engine.api.util.DonAsynchron; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; -import org.thingsboard.server.common.data.kv.BaseTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.common.data.kv.TsKvQuery; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -39,8 +37,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; -import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL; -import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE; +import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.*; import static org.thingsboard.server.common.data.kv.Aggregation.NONE; /** @@ -64,6 +61,7 @@ public class TbGetTelemetryNode implements TbNode { private long endTsOffset; private int limit; private ObjectMapper mapper; + private String fetchMode; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { @@ -72,6 +70,7 @@ public class TbGetTelemetryNode implements TbNode { startTsOffset = TimeUnit.valueOf(config.getStartIntervalTimeUnit()).toMillis(config.getStartInterval()); endTsOffset = TimeUnit.valueOf(config.getEndIntervalTimeUnit()).toMillis(config.getEndInterval()); limit = config.getFetchMode().equals(FETCH_MODE_ALL) ? MAX_FETCH_SIZE : 1; + fetchMode = config.getFetchMode(); mapper = new ObjectMapper(); mapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false); mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); @@ -96,14 +95,18 @@ public class TbGetTelemetryNode implements TbNode { } } - //TODO: handle direction; private List buildQueries() { long ts = System.currentTimeMillis(); long startTs = ts - startTsOffset; long endTs = ts - endTsOffset; - + String orderBy; + if (fetchMode.equals(FETCH_MODE_FIRST) || fetchMode.equals(FETCH_MODE_ALL)) { + orderBy = "ASC"; + } else { + orderBy = "DESC"; + } return tsKeyNames.stream() - .map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, limit, NONE)) + .map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, limit, NONE, orderBy)) .collect(Collectors.toList()); } @@ -116,7 +119,7 @@ public class TbGetTelemetryNode implements TbNode { } for (String key : tsKeyNames) { - if(resultNode.has(key)){ + if (resultNode.has(key)) { msg.getMetaData().putValue(key, resultNode.get(key).toString()); } } @@ -127,11 +130,11 @@ public class TbGetTelemetryNode implements TbNode { } private void processArray(ObjectNode node, TsKvEntry entry) { - if(node.has(entry.getKey())){ + if (node.has(entry.getKey())) { ArrayNode arrayNode = (ArrayNode) node.get(entry.getKey()); ObjectNode obj = buildNode(entry); arrayNode.add(obj); - }else { + } else { ArrayNode arrayNode = mapper.createArrayNode(); ObjectNode obj = buildNode(entry); arrayNode.add(obj);