From 94ef2b60053499e5741906318c1312064ca7a3f8 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Fri, 22 Jun 2018 19:32:13 +0300 Subject: [PATCH] Delay node implementation --- .../rule/engine/delay/TbMsgDelayNode.java | 84 +++++++++++++++++++ .../delay/TbMsgDelayNodeConfiguration.java | 35 ++++++++ 2 files changed, 119 insertions(+) create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java new file mode 100644 index 0000000000..9330136feb --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNode.java @@ -0,0 +1,84 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.delay; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE; +import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; + +@Slf4j +@RuleNode( + type = ComponentType.ACTION, + name = "delay", + configClazz = TbMsgDelayNodeConfiguration.class, + nodeDescription = "Delays incoming message", + nodeDetails = "Delays messages for configurable period.", + icon = "repeat" +) + +public class TbMsgDelayNode implements TbNode { + + private static final String TB_MSG_DELAY_NODE_MSG = "TbMsgDelayNodeMsg"; + + private TbMsgDelayNodeConfiguration config; + private long delay; + private Map pendingMsgs; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbMsgDelayNodeConfiguration.class); + this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds()); + this.pendingMsgs = new HashMap<>(); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + if (msg.getType().equals(TB_MSG_DELAY_NODE_MSG)) { + TbMsg pendingMsg = pendingMsgs.remove(UUID.fromString(msg.getData())); + if (pendingMsg != null) { + ctx.tellNext(pendingMsg, SUCCESS); + } + } else { + if(pendingMsgs.size() < config.getMaxPendingMsgs()) { + pendingMsgs.put(msg.getId(), msg); + TbMsg tickMsg = ctx.newMsg(TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), msg.getId().toString()); + ctx.tellSelf(tickMsg, delay); + } else { + ctx.tellNext(msg, FAILURE, new RuntimeException("Max limit of pending messages reached!")); + } + } + } + + @Override + public void destroy() { + pendingMsgs.clear(); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java new file mode 100644 index 0000000000..411a1a5eb5 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/delay/TbMsgDelayNodeConfiguration.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.delay; + +import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.server.common.data.EntityType; + +@Data +public class TbMsgDelayNodeConfiguration implements NodeConfiguration { + + private int periodInSeconds; + private int maxPendingMsgs; + + @Override + public TbMsgDelayNodeConfiguration defaultConfiguration() { + TbMsgDelayNodeConfiguration configuration = new TbMsgDelayNodeConfiguration(); + configuration.setPeriodInSeconds(60); + configuration.setMaxPendingMsgs(1000); + return configuration; + } +}