diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 37c4871557..bebd2cc4d3 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -279,9 +279,6 @@ public class TelemetryController extends BaseController { deleteFromTs = 0L; deleteToTs = System.currentTimeMillis(); } else { - if (startTs == null || endTs == null) { - return getImmediateDeferredResult("StartTs and endTs could not be empty when deleteAllDataForKeys equals [false]", HttpStatus.BAD_REQUEST); - } deleteFromTs = startTs; deleteToTs = endTs; } diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java index a378ff1e15..f09a0f950f 100644 --- a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java +++ b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,10 +23,17 @@ import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.msg.TbMsg; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -37,28 +44,45 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ @Value("${actors.rule.transaction.queue_size}") private int finalQueueSize; + @Value("${actors.rule.transaction.duration}") + private long duration; private final Lock transactionLock = new ReentrantLock(); private final ConcurrentMap> transactionMap = new ConcurrentHashMap<>(); + private final Queue timeoutQueue = new ConcurrentLinkedQueue<>(); - //TODO: add delete on timeout from queue -> onFailure accept + private ExecutorService timeoutExecutor; + + @PostConstruct + public void init() { + timeoutExecutor = Executors.newSingleThreadExecutor(); + executeOnTimeout(); + } + + @PreDestroy + public void destroy() { + if (timeoutExecutor != null) { + timeoutExecutor.shutdownNow(); + } + } @Override public void beginTransaction(TbContext ctx, TbMsg msg, Consumer onStart, Consumer onEnd, Consumer onFailure) { - BlockingQueue queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id -> - new LinkedBlockingQueue<>(finalQueueSize)); transactionLock.lock(); try { + BlockingQueue queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id -> + new LinkedBlockingQueue<>(finalQueueSize)); + TbTransactionTask task = new TbTransactionTask(msg, onStart, onEnd, onFailure); int queueSize = queue.size(); if (queueSize >= finalQueueSize) { - onFailure.accept(new RuntimeException("Queue has no space!")); + task.getOnFailure().accept(new RuntimeException("Queue has no space!")); } else { - addMsgToQueue(queue, task, onFailure); + addMsgToQueues(queue, task); if (queueSize == 0) { - onStart.accept(msg); + startTransactionTask(task); } else { - log.info("Msg [{}] [{}] is waiting to start transaction!", msg.getId(), msg.getType()); + log.trace("Msg [{}] [{}] is waiting to start transaction!", msg.getId(), msg.getType()); } } } finally { @@ -66,41 +90,85 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ } } - private void addMsgToQueue(BlockingQueue queue, TbTransactionTask task, Consumer onFailure) { - try { - queue.add(task); - log.info("Added msg to queue, size: [{}]", queue.size()); - } catch (Exception e) { - log.error("Error when trying to add msg [{}] to the queue", task.getMsg(), e); - onFailure.accept(e); - } + private void addMsgToQueues(BlockingQueue queue, TbTransactionTask task) { + queue.offer(task); + timeoutQueue.offer(task); + log.trace("Added msg to queue, size: [{}]", queue.size()); } @Override public boolean endTransaction(TbContext ctx, TbMsg msg, Consumer onFailure) { - transactionLock.lock(); - try { - BlockingQueue queue = transactionMap.get(msg.getTransactionData().getOriginatorId()); - try { - TbTransactionTask currentTask = queue.element(); - if (currentTask.getMsg().getTransactionData().getTransactionId().equals(msg.getTransactionData().getTransactionId())) { - queue.remove(); - log.info("Removed msg from queue, size [{}]", queue.size()); - currentTask.getOnEnd().accept(currentTask.getMsg()); + BlockingQueue queue = transactionMap.get(msg.getTransactionData().getOriginatorId()); - TbTransactionTask nextTask = queue.peek(); - if (nextTask != null) { - nextTask.getOnStart().accept(nextTask.getMsg()); - } + TbTransactionTask currentTask = queue.peek(); + if (currentTask != null) { + if (currentTask.getMsg().getTransactionData().getTransactionId().equals(msg.getTransactionData().getTransactionId())) { + currentTask.setIsCompleted(true); + queue.remove(); + log.trace("Removed msg from queue, size [{}]", queue.size()); + currentTask.getOnEnd().accept(currentTask.getMsg()); + + TbTransactionTask nextTask = queue.peek(); + if (nextTask != null) { + startTransactionTask(nextTask); } - } catch (Exception e) { - log.error("Queue is empty!", queue); - onFailure.accept(e); + } else { + log.trace("Task has expired!"); + onFailure.accept(new RuntimeException("Task has expired!")); return true; } - } finally { - transactionLock.unlock(); + } else { + log.trace("Queue is empty, previous task has expired!"); + onFailure.accept(new RuntimeException("Queue is empty, previous task has expired!")); + return true; } return false; } + + private void executeOnTimeout() { + timeoutExecutor.execute(() -> { + while (true) { + TbTransactionTask task = timeoutQueue.peek(); + if (task != null) { + if (task.getIsCompleted()) { + timeoutQueue.poll(); + } else { + if (System.currentTimeMillis() > task.getExpirationTime()) { + log.trace("Task has expired! Deleting it...[{}] [{}]", task.getMsg().getId(), task.getMsg().getType()); + timeoutQueue.poll(); + task.getOnFailure().accept(new RuntimeException("Task has expired!")); + + BlockingQueue queue = transactionMap.get(task.getMsg().getTransactionData().getOriginatorId()); + queue.poll(); + + TbTransactionTask nextTask = queue.peek(); + if (nextTask != null) { + startTransactionTask(nextTask); + } + } else { + try { + log.trace("Task has not expired! Continue executing...[{}] [{}]", task.getMsg().getId(), task.getMsg().getType()); + TimeUnit.MILLISECONDS.sleep(duration); + } catch (InterruptedException e) { + throw new IllegalStateException("Thread interrupted", e); + } + } + } + } else { + try { + log.trace("Queue is empty, waiting for tasks!"); + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + throw new IllegalStateException("Thread interrupted", e); + } + } + } + }); + } + + private void startTransactionTask(TbTransactionTask task) { + task.setIsCompleted(false); + task.setExpirationTime(System.currentTimeMillis() + duration); + task.getOnStart().accept(task.getMsg()); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java b/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java index 12168d9934..a56bc0557c 100644 --- a/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java +++ b/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java @@ -1,11 +1,28 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.service.transaction; +import lombok.AllArgsConstructor; import lombok.Data; import org.thingsboard.server.common.msg.TbMsg; import java.util.function.Consumer; @Data +@AllArgsConstructor public final class TbTransactionTask { private final TbMsg msg; @@ -13,4 +30,14 @@ public final class TbTransactionTask { private final Consumer onEnd; private final Consumer onFailure; + private Boolean isCompleted; + private Long expirationTime; + + public TbTransactionTask(TbMsg msg, Consumer onStart, Consumer onEnd, Consumer onFailure) { + this.msg = msg; + this.onStart = onStart; + this.onEnd = onEnd; + this.onFailure = onFailure; + this.isCompleted = false; + } } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index a837a3e30f..157e36b57c 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -215,6 +215,8 @@ actors: transaction: # Size of queues which store messages for transaction rule nodes queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:10}" + # Time in milliseconds for transaction to complete + duration: "${ACTORS_RULE_TRANSACTION_DURATION:10000}" statistics: # Enable/disable actor statistics enabled: "${ACTORS_STATISTICS_ENABLED:true}" diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java index 8e466a3ecf..bad4585d6a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -54,7 +54,7 @@ public class TbTransactionBeginNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { - log.info("Msg in - [{}] [{}]", msg.getId(), msg.getType()); + log.trace("Msg in - [{}] [{}]", msg.getId(), msg.getType()); TbMsgTransactionData transactionData = new TbMsgTransactionData(UUID.randomUUID(), msg.getOriginator()); @@ -62,11 +62,11 @@ public class TbTransactionBeginNode implements TbNode { msg.getData(), transactionData, msg.getRuleChainId(), msg.getRuleNodeId(), msg.getClusterPartition()); ctx.getRuleChainTransactionService().beginTransaction(ctx, tbMsg, onStart -> { - log.info("Transaction starting... [{}] [{}]", tbMsg.getId(), tbMsg.getType()); + log.trace("Transaction starting... [{}] [{}]", tbMsg.getId(), tbMsg.getType()); ctx.tellNext(tbMsg, SUCCESS); - }, onEnd -> log.info("Transaction ended successfully... [{}] [{}]", tbMsg.getId(), tbMsg.getType()), + }, onEnd -> log.trace("Transaction ended successfully... [{}] [{}]", tbMsg.getId(), tbMsg.getType()), throwable -> { - log.error("Transaction failed due to queue size restriction! [{}] [{}]", tbMsg.getId(), tbMsg.getType(), throwable); + log.error("Transaction failed! [{}] [{}]", tbMsg.getId(), tbMsg.getType(), throwable); ctx.tellFailure(tbMsg, throwable); }); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java index c1d5cefd22..1bcd58b6e1 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -55,7 +55,7 @@ public class TbTransactionEndNode implements TbNode { if (!isFailed) { ctx.tellNext(msg, SUCCESS); } - log.info("Msg out - [{}] [{}]", msg.getId(), msg.getType()); + log.trace("Msg out - [{}] [{}]", msg.getId(), msg.getType()); } @Override