From 1bf196bfdf2e3aa3ce5c3a7e5cdfbcfe47a6afc0 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Sat, 8 Dec 2018 17:40:11 +0200 Subject: [PATCH] ThingsBoard Rule Nodes improvements --- .../BaseRuleChainTransactionService.java | 19 ++++++++++++------- ...e.java => TbSynchronizationBeginNode.java} | 14 ++++++++------ ...ode.java => TbSynchronizationEndNode.java} | 8 ++++---- 3 files changed, 24 insertions(+), 17 deletions(-) rename rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/{TbTransactionBeginNode.java => TbSynchronizationBeginNode.java} (79%) rename rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/{TbTransactionEndNode.java => TbSynchronizationEndNode.java} (89%) diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java index d7aa7664b5..884604491f 100644 --- a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java +++ b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java @@ -182,12 +182,14 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ while (true) { TbTransactionTask transactionTask = timeoutQueue.peek(); if (transactionTask != null) { + long sleepDuration = 0L; transactionLock.lock(); try { if (transactionTask.isCompleted()) { timeoutQueue.poll(); } else { - if (System.currentTimeMillis() > transactionTask.getExpirationTime()) { + long expIn = transactionTask.getExpirationTime() - System.currentTimeMillis(); + if (expIn < 0) { log.trace("Task has expired! Deleting it...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType()); timeoutQueue.poll(); executeOnFailure(transactionTask.getOnFailure(), "Task has expired!"); @@ -201,17 +203,20 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ } } } else { - try { - log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType()); - TimeUnit.MILLISECONDS.sleep(duration); - } catch (InterruptedException e) { - throw new IllegalStateException("Thread interrupted", e); - } + sleepDuration = Math.min(expIn, duration); } } } finally { transactionLock.unlock(); } + if (sleepDuration > 0L) { + try { + log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType()); + TimeUnit.MILLISECONDS.sleep(sleepDuration); + } catch (InterruptedException e) { + throw new IllegalStateException("Thread interrupted", e); + } + } } else { try { log.trace("Queue is empty, waiting for tasks!"); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbSynchronizationBeginNode.java similarity index 79% rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java rename to rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbSynchronizationBeginNode.java index 75175ef7fa..d8a61173e8 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbSynchronizationBeginNode.java @@ -35,13 +35,15 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; @Slf4j @RuleNode( type = ComponentType.ACTION, - name = "transaction start", + name = "synchronization start", configClazz = EmptyNodeConfiguration.class, - nodeDescription = "", - nodeDetails = "", + nodeDescription = "Starts synchronization of message processing based on message originator", + nodeDetails = "This node should be used together with \"synchronization end\" node. \n This node will put messages into queue based on message originator id. \n" + + "Subsequent messages will not be processed until the previous message processing is completed or timeout event occurs.\n" + + "Size of the queue per originator and timeout values are configurable on a system level", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbNodeEmptyConfig") -public class TbTransactionBeginNode implements TbNode { +public class TbSynchronizationBeginNode implements TbNode { private EmptyNodeConfiguration config; @@ -51,7 +53,7 @@ public class TbTransactionBeginNode implements TbNode { } @Override - public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { + public void onMsg(TbContext ctx, TbMsg msg) { log.trace("Msg enters transaction - [{}][{}]", msg.getId(), msg.getType()); TbMsgTransactionData transactionData = new TbMsgTransactionData(msg.getId(), msg.getOriginator()); @@ -63,7 +65,7 @@ public class TbTransactionBeginNode implements TbNode { ctx.tellNext(startMsg, SUCCESS); }, endMsg -> log.trace("Transaction ended successfully...[{}][{}]", endMsg.getId(), endMsg.getType()), throwable -> { - log.error("Transaction failed! [{}][{}]", tbMsg.getId(), tbMsg.getType(), throwable); + log.trace("Transaction failed! [{}][{}]", tbMsg.getId(), tbMsg.getType(), throwable); ctx.tellFailure(tbMsg, throwable); }); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbSynchronizationEndNode.java similarity index 89% rename from rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java rename to rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbSynchronizationEndNode.java index a51d97e32a..d95b8427df 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbSynchronizationEndNode.java @@ -33,14 +33,14 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; @Slf4j @RuleNode( type = ComponentType.ACTION, - name = "transaction end", + name = "synchronization end", configClazz = EmptyNodeConfiguration.class, - nodeDescription = "", + nodeDescription = "Stops synchronization of message processing based on message originator", nodeDetails = "", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = ("tbNodeEmptyConfig") ) -public class TbTransactionEndNode implements TbNode { +public class TbSynchronizationEndNode implements TbNode { private EmptyNodeConfiguration config; @@ -50,7 +50,7 @@ public class TbTransactionEndNode implements TbNode { } @Override - public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { + public void onMsg(TbContext ctx, TbMsg msg) { ctx.getRuleChainTransactionService().endTransaction(msg, successMsg -> ctx.tellNext(successMsg, SUCCESS), throwable -> ctx.tellFailure(msg, throwable));