transaction service improvements, added timeout queue
This commit is contained in:
		
							parent
							
								
									6686003c5a
								
							
						
					
					
						commit
						6ae751c76c
					
				@ -279,9 +279,6 @@ public class TelemetryController extends BaseController {
 | 
			
		||||
            deleteFromTs = 0L;
 | 
			
		||||
            deleteToTs = System.currentTimeMillis();
 | 
			
		||||
        } else {
 | 
			
		||||
            if (startTs == null || endTs == null) {
 | 
			
		||||
                return getImmediateDeferredResult("StartTs and endTs could not be empty when deleteAllDataForKeys equals [false]", HttpStatus.BAD_REQUEST);
 | 
			
		||||
            }
 | 
			
		||||
            deleteFromTs = startTs;
 | 
			
		||||
            deleteToTs = endTs;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -1,12 +1,12 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2018 The Thingsboard Authors
 | 
			
		||||
 * <p>
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 * <p>
 | 
			
		||||
 * http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 * <p>
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
@ -23,10 +23,17 @@ import org.thingsboard.rule.engine.api.TbContext;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PostConstruct;
 | 
			
		||||
import javax.annotation.PreDestroy;
 | 
			
		||||
import java.util.Queue;
 | 
			
		||||
import java.util.concurrent.BlockingQueue;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentLinkedQueue;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
import java.util.concurrent.ExecutorService;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.LinkedBlockingQueue;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.locks.Lock;
 | 
			
		||||
import java.util.concurrent.locks.ReentrantLock;
 | 
			
		||||
import java.util.function.Consumer;
 | 
			
		||||
@ -37,28 +44,45 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
 | 
			
		||||
 | 
			
		||||
    @Value("${actors.rule.transaction.queue_size}")
 | 
			
		||||
    private int finalQueueSize;
 | 
			
		||||
    @Value("${actors.rule.transaction.duration}")
 | 
			
		||||
    private long duration;
 | 
			
		||||
 | 
			
		||||
    private final Lock transactionLock = new ReentrantLock();
 | 
			
		||||
    private final ConcurrentMap<EntityId, BlockingQueue<TbTransactionTask>> transactionMap = new ConcurrentHashMap<>();
 | 
			
		||||
    private final Queue<TbTransactionTask> timeoutQueue = new ConcurrentLinkedQueue<>();
 | 
			
		||||
 | 
			
		||||
    //TODO: add delete on timeout from queue -> onFailure accept
 | 
			
		||||
    private ExecutorService timeoutExecutor;
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    public void init() {
 | 
			
		||||
        timeoutExecutor = Executors.newSingleThreadExecutor();
 | 
			
		||||
        executeOnTimeout();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
        if (timeoutExecutor != null) {
 | 
			
		||||
            timeoutExecutor.shutdownNow();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void beginTransaction(TbContext ctx, TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure) {
 | 
			
		||||
        BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id ->
 | 
			
		||||
                new LinkedBlockingQueue<>(finalQueueSize));
 | 
			
		||||
        transactionLock.lock();
 | 
			
		||||
        try {
 | 
			
		||||
            BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id ->
 | 
			
		||||
                    new LinkedBlockingQueue<>(finalQueueSize));
 | 
			
		||||
 | 
			
		||||
            TbTransactionTask task = new TbTransactionTask(msg, onStart, onEnd, onFailure);
 | 
			
		||||
            int queueSize = queue.size();
 | 
			
		||||
            if (queueSize >= finalQueueSize) {
 | 
			
		||||
                onFailure.accept(new RuntimeException("Queue has no space!"));
 | 
			
		||||
                task.getOnFailure().accept(new RuntimeException("Queue has no space!"));
 | 
			
		||||
            } else {
 | 
			
		||||
                addMsgToQueue(queue, task, onFailure);
 | 
			
		||||
                addMsgToQueues(queue, task);
 | 
			
		||||
                if (queueSize == 0) {
 | 
			
		||||
                    onStart.accept(msg);
 | 
			
		||||
                    startTransactionTask(task);
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.info("Msg [{}] [{}] is waiting to start transaction!", msg.getId(), msg.getType());
 | 
			
		||||
                    log.trace("Msg [{}] [{}] is waiting to start transaction!", msg.getId(), msg.getType());
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        } finally {
 | 
			
		||||
@ -66,41 +90,85 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void addMsgToQueue(BlockingQueue<TbTransactionTask> queue, TbTransactionTask task, Consumer<Throwable> onFailure) {
 | 
			
		||||
        try {
 | 
			
		||||
            queue.add(task);
 | 
			
		||||
            log.info("Added msg to queue, size: [{}]", queue.size());
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("Error when trying to add msg [{}] to the queue", task.getMsg(), e);
 | 
			
		||||
            onFailure.accept(e);
 | 
			
		||||
        }
 | 
			
		||||
    private void addMsgToQueues(BlockingQueue<TbTransactionTask> queue, TbTransactionTask task) {
 | 
			
		||||
        queue.offer(task);
 | 
			
		||||
        timeoutQueue.offer(task);
 | 
			
		||||
        log.trace("Added msg to queue, size: [{}]", queue.size());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean endTransaction(TbContext ctx, TbMsg msg, Consumer<Throwable> onFailure) {
 | 
			
		||||
        transactionLock.lock();
 | 
			
		||||
        try {
 | 
			
		||||
            BlockingQueue<TbTransactionTask> queue = transactionMap.get(msg.getTransactionData().getOriginatorId());
 | 
			
		||||
            try {
 | 
			
		||||
                TbTransactionTask currentTask = queue.element();
 | 
			
		||||
                if (currentTask.getMsg().getTransactionData().getTransactionId().equals(msg.getTransactionData().getTransactionId())) {
 | 
			
		||||
                    queue.remove();
 | 
			
		||||
                    log.info("Removed msg from queue, size [{}]", queue.size());
 | 
			
		||||
                    currentTask.getOnEnd().accept(currentTask.getMsg());
 | 
			
		||||
        BlockingQueue<TbTransactionTask> queue = transactionMap.get(msg.getTransactionData().getOriginatorId());
 | 
			
		||||
 | 
			
		||||
                    TbTransactionTask nextTask = queue.peek();
 | 
			
		||||
                    if (nextTask != null) {
 | 
			
		||||
                        nextTask.getOnStart().accept(nextTask.getMsg());
 | 
			
		||||
                    }
 | 
			
		||||
        TbTransactionTask currentTask = queue.peek();
 | 
			
		||||
        if (currentTask != null) {
 | 
			
		||||
            if (currentTask.getMsg().getTransactionData().getTransactionId().equals(msg.getTransactionData().getTransactionId())) {
 | 
			
		||||
                currentTask.setIsCompleted(true);
 | 
			
		||||
                queue.remove();
 | 
			
		||||
                log.trace("Removed msg from queue, size [{}]", queue.size());
 | 
			
		||||
                currentTask.getOnEnd().accept(currentTask.getMsg());
 | 
			
		||||
 | 
			
		||||
                TbTransactionTask nextTask = queue.peek();
 | 
			
		||||
                if (nextTask != null) {
 | 
			
		||||
                    startTransactionTask(nextTask);
 | 
			
		||||
                }
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                log.error("Queue is empty!", queue);
 | 
			
		||||
                onFailure.accept(e);
 | 
			
		||||
            } else {
 | 
			
		||||
                log.trace("Task has expired!");
 | 
			
		||||
                onFailure.accept(new RuntimeException("Task has expired!"));
 | 
			
		||||
                return true;
 | 
			
		||||
            }
 | 
			
		||||
        } finally {
 | 
			
		||||
            transactionLock.unlock();
 | 
			
		||||
        } else {
 | 
			
		||||
            log.trace("Queue is empty, previous task has expired!");
 | 
			
		||||
            onFailure.accept(new RuntimeException("Queue is empty, previous task has expired!"));
 | 
			
		||||
            return true;
 | 
			
		||||
        }
 | 
			
		||||
        return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void executeOnTimeout() {
 | 
			
		||||
        timeoutExecutor.execute(() -> {
 | 
			
		||||
            while (true) {
 | 
			
		||||
                TbTransactionTask task = timeoutQueue.peek();
 | 
			
		||||
                if (task != null) {
 | 
			
		||||
                    if (task.getIsCompleted()) {
 | 
			
		||||
                        timeoutQueue.poll();
 | 
			
		||||
                    } else {
 | 
			
		||||
                        if (System.currentTimeMillis() > task.getExpirationTime()) {
 | 
			
		||||
                            log.trace("Task has expired! Deleting it...[{}] [{}]", task.getMsg().getId(), task.getMsg().getType());
 | 
			
		||||
                            timeoutQueue.poll();
 | 
			
		||||
                            task.getOnFailure().accept(new RuntimeException("Task has expired!"));
 | 
			
		||||
 | 
			
		||||
                            BlockingQueue<TbTransactionTask> queue = transactionMap.get(task.getMsg().getTransactionData().getOriginatorId());
 | 
			
		||||
                            queue.poll();
 | 
			
		||||
 | 
			
		||||
                            TbTransactionTask nextTask = queue.peek();
 | 
			
		||||
                            if (nextTask != null) {
 | 
			
		||||
                                startTransactionTask(nextTask);
 | 
			
		||||
                            }
 | 
			
		||||
                        } else {
 | 
			
		||||
                            try {
 | 
			
		||||
                                log.trace("Task has not expired! Continue executing...[{}] [{}]", task.getMsg().getId(), task.getMsg().getType());
 | 
			
		||||
                                TimeUnit.MILLISECONDS.sleep(duration);
 | 
			
		||||
                            } catch (InterruptedException e) {
 | 
			
		||||
                                throw new IllegalStateException("Thread interrupted", e);
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                } else {
 | 
			
		||||
                    try {
 | 
			
		||||
                        log.trace("Queue is empty, waiting for tasks!");
 | 
			
		||||
                        TimeUnit.SECONDS.sleep(1);
 | 
			
		||||
                    } catch (InterruptedException e) {
 | 
			
		||||
                        throw new IllegalStateException("Thread interrupted", e);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void startTransactionTask(TbTransactionTask task) {
 | 
			
		||||
        task.setIsCompleted(false);
 | 
			
		||||
        task.setExpirationTime(System.currentTimeMillis() + duration);
 | 
			
		||||
        task.getOnStart().accept(task.getMsg());
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,11 +1,28 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2018 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.transaction;
 | 
			
		||||
 | 
			
		||||
import lombok.AllArgsConstructor;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
import java.util.function.Consumer;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
@AllArgsConstructor
 | 
			
		||||
public final class TbTransactionTask {
 | 
			
		||||
 | 
			
		||||
    private final TbMsg msg;
 | 
			
		||||
@ -13,4 +30,14 @@ public final class TbTransactionTask {
 | 
			
		||||
    private final Consumer<TbMsg> onEnd;
 | 
			
		||||
    private final Consumer<Throwable> onFailure;
 | 
			
		||||
 | 
			
		||||
    private Boolean isCompleted;
 | 
			
		||||
    private Long expirationTime;
 | 
			
		||||
 | 
			
		||||
    public TbTransactionTask(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure) {
 | 
			
		||||
        this.msg = msg;
 | 
			
		||||
        this.onStart = onStart;
 | 
			
		||||
        this.onEnd = onEnd;
 | 
			
		||||
        this.onFailure = onFailure;
 | 
			
		||||
        this.isCompleted = false;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -215,6 +215,8 @@ actors:
 | 
			
		||||
    transaction:
 | 
			
		||||
      # Size of queues which store messages for transaction rule nodes
 | 
			
		||||
      queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:10}"
 | 
			
		||||
      # Time in milliseconds for transaction to complete
 | 
			
		||||
      duration: "${ACTORS_RULE_TRANSACTION_DURATION:10000}"
 | 
			
		||||
  statistics:
 | 
			
		||||
    # Enable/disable actor statistics
 | 
			
		||||
    enabled: "${ACTORS_STATISTICS_ENABLED:true}"
 | 
			
		||||
 | 
			
		||||
@ -1,12 +1,12 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2018 The Thingsboard Authors
 | 
			
		||||
 * <p>
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 * <p>
 | 
			
		||||
 * http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 * <p>
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
@ -54,7 +54,7 @@ public class TbTransactionBeginNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
 | 
			
		||||
        log.info("Msg in - [{}] [{}]", msg.getId(), msg.getType());
 | 
			
		||||
        log.trace("Msg in - [{}] [{}]", msg.getId(), msg.getType());
 | 
			
		||||
 | 
			
		||||
        TbMsgTransactionData transactionData = new TbMsgTransactionData(UUID.randomUUID(), msg.getOriginator());
 | 
			
		||||
 | 
			
		||||
@ -62,11 +62,11 @@ public class TbTransactionBeginNode implements TbNode {
 | 
			
		||||
                msg.getData(), transactionData, msg.getRuleChainId(), msg.getRuleNodeId(), msg.getClusterPartition());
 | 
			
		||||
 | 
			
		||||
        ctx.getRuleChainTransactionService().beginTransaction(ctx, tbMsg, onStart -> {
 | 
			
		||||
                    log.info("Transaction starting... [{}] [{}]", tbMsg.getId(), tbMsg.getType());
 | 
			
		||||
                    log.trace("Transaction starting... [{}] [{}]", tbMsg.getId(), tbMsg.getType());
 | 
			
		||||
                    ctx.tellNext(tbMsg, SUCCESS);
 | 
			
		||||
                }, onEnd -> log.info("Transaction ended successfully... [{}] [{}]", tbMsg.getId(), tbMsg.getType()),
 | 
			
		||||
                }, onEnd -> log.trace("Transaction ended successfully... [{}] [{}]", tbMsg.getId(), tbMsg.getType()),
 | 
			
		||||
                throwable -> {
 | 
			
		||||
                    log.error("Transaction failed due to queue size restriction! [{}] [{}]", tbMsg.getId(), tbMsg.getType(), throwable);
 | 
			
		||||
                    log.error("Transaction failed! [{}] [{}]", tbMsg.getId(), tbMsg.getType(), throwable);
 | 
			
		||||
                    ctx.tellFailure(tbMsg, throwable);
 | 
			
		||||
                });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -1,12 +1,12 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2018 The Thingsboard Authors
 | 
			
		||||
 * <p>
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 * <p>
 | 
			
		||||
 * http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 * <p>
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
@ -55,7 +55,7 @@ public class TbTransactionEndNode implements TbNode {
 | 
			
		||||
        if (!isFailed) {
 | 
			
		||||
            ctx.tellNext(msg, SUCCESS);
 | 
			
		||||
        }
 | 
			
		||||
        log.info("Msg out - [{}] [{}]", msg.getId(), msg.getType());
 | 
			
		||||
        log.trace("Msg out - [{}] [{}]", msg.getId(), msg.getType());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user