diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java index d0eb713215..baf58bf80c 100644 --- a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java +++ b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java @@ -22,16 +22,15 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.RuleChainTransactionService; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.id.AssetId; -import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.gen.cluster.ClusterAPIProtos; import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; +import org.thingsboard.server.service.executors.DbCallbackExecutorService; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -39,6 +38,7 @@ import java.util.Optional; import java.util.Queue; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; @@ -60,6 +60,9 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ @Autowired private ClusterRpcService clusterRpcService; + @Autowired + private DbCallbackExecutorService callbackExecutor; + @Value("${actors.rule.transaction.queue_size}") private int finalQueueSize; @Value("${actors.rule.transaction.duration}") @@ -70,12 +73,10 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ private final Queue timeoutQueue = new ConcurrentLinkedQueue<>(); private ExecutorService timeoutExecutor; - private ExecutorService executor; @PostConstruct public void init() { timeoutExecutor = Executors.newSingleThreadExecutor(); - executor = Executors.newSingleThreadExecutor(); executeOnTimeout(); } @@ -84,9 +85,6 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ if (timeoutExecutor != null) { timeoutExecutor.shutdownNow(); } - if (executor != null) { - executor.shutdownNow(); - } } @Override @@ -96,16 +94,16 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ BlockingQueue queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id -> new LinkedBlockingQueue<>(finalQueueSize)); - TbTransactionTask task = new TbTransactionTask(msg, onStart, onEnd, onFailure); + TbTransactionTask transactionTask = new TbTransactionTask(msg, onStart, onEnd, onFailure, System.currentTimeMillis() + duration); int queueSize = queue.size(); if (queueSize >= finalQueueSize) { - task.getOnFailure().accept(new RuntimeException("Queue has no space!")); + executeOnFailure(transactionTask.getOnFailure(), "Queue has no space!"); } else { - addMsgToQueues(queue, task); + addMsgToQueues(queue, transactionTask); if (queueSize == 0) { - startTransactionTask(task); + executeOnSuccess(transactionTask.getOnStart(), transactionTask.getMsg()); } else { - log.trace("Msg [{}] [{}] is waiting to start transaction!", msg.getId(), msg.getType()); + log.trace("Msg [{}][{}] is waiting to start transaction!", msg.getId(), msg.getType()); } } } finally { @@ -113,64 +111,79 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ } } - private void addMsgToQueues(BlockingQueue queue, TbTransactionTask task) { - queue.offer(task); - timeoutQueue.offer(task); + private void addMsgToQueues(BlockingQueue queue, TbTransactionTask transactionTask) { + queue.offer(transactionTask); + timeoutQueue.offer(transactionTask); log.trace("Added msg to queue, size: [{}]", queue.size()); } @Override - public boolean endTransaction(TbContext ctx, TbMsg msg, Consumer onFailure) { - BlockingQueue queue = transactionMap.get(msg.getTransactionData().getOriginatorId()); + public void endTransaction(TbContext ctx, TbMsg msg, Consumer onSuccess, Consumer onFailure) { + EntityId originatorId = msg.getTransactionData().getOriginatorId(); - TbTransactionTask currentTask = queue.peek(); - if (currentTask != null) { - if (currentTask.getMsg().getTransactionData().getTransactionId().equals(msg.getTransactionData().getTransactionId())) { - currentTask.setIsCompleted(true); - queue.remove(); - log.trace("Removed msg from queue, size [{}]", queue.size()); - currentTask.getOnEnd().accept(currentTask.getMsg()); + if (!onRemoteTransactionEndSync(ctx.getTenantId(), originatorId)) { + transactionLock.lock(); + try { + BlockingQueue queue = transactionMap.computeIfAbsent(originatorId, id -> + new LinkedBlockingQueue<>(finalQueueSize)); - TbTransactionTask nextTask = queue.peek(); - if (nextTask != null) { - startTransactionTask(nextTask); + TbTransactionTask currentTransactionTask = queue.peek(); + if (currentTransactionTask != null) { + if (currentTransactionTask.getMsg().getTransactionData().getTransactionId().equals(msg.getTransactionData().getTransactionId())) { + currentTransactionTask.setCompleted(true); + queue.poll(); + log.trace("Removed msg from queue, size [{}]", queue.size()); + + executeOnSuccess(currentTransactionTask.getOnEnd(), currentTransactionTask.getMsg()); + executeOnSuccess(onSuccess, currentTransactionTask.getMsg()); + + TbTransactionTask nextTransactionTask = queue.peek(); + if (nextTransactionTask != null) { + executeOnSuccess(nextTransactionTask.getOnStart(), nextTransactionTask.getMsg()); + } + } else { + log.trace("Task has expired!"); + executeOnFailure(onFailure, "Task has expired!"); + } + } else { + log.trace("Queue is empty, previous task has expired!"); + executeOnFailure(onFailure, "Queue is empty, previous task has expired!"); } - } else { - log.trace("Task has expired!"); - onFailure.accept(new RuntimeException("Task has expired!")); - return true; + } finally { + transactionLock.unlock(); } - } else { - log.trace("Queue is empty, previous task has expired!"); - onFailure.accept(new RuntimeException("Queue is empty, previous task has expired!")); - return true; } - return false; } private void executeOnTimeout() { timeoutExecutor.submit(() -> { while (true) { - TbTransactionTask task = timeoutQueue.peek(); - if (task != null) { - if (task.getIsCompleted()) { + TbTransactionTask transactionTask = timeoutQueue.peek(); + if (transactionTask != null) { + if (transactionTask.isCompleted()) { timeoutQueue.poll(); } else { - if (System.currentTimeMillis() > task.getExpirationTime()) { - log.trace("Task has expired! Deleting it...[{}] [{}]", task.getMsg().getId(), task.getMsg().getType()); - timeoutQueue.poll(); - task.getOnFailure().accept(new RuntimeException("Task has expired!")); + if (System.currentTimeMillis() > transactionTask.getExpirationTime()) { + transactionLock.lock(); + try { + log.trace("Task has expired! Deleting it...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType()); + timeoutQueue.poll(); + executeOnFailure(transactionTask.getOnFailure(), "Task has expired!"); - BlockingQueue queue = transactionMap.get(task.getMsg().getTransactionData().getOriginatorId()); - queue.poll(); - - TbTransactionTask nextTask = queue.peek(); - if (nextTask != null) { - startTransactionTask(nextTask); + BlockingQueue queue = transactionMap.get(transactionTask.getMsg().getTransactionData().getOriginatorId()); + if (queue != null) { + queue.poll(); + TbTransactionTask nextTransactionTask = queue.peek(); + if (nextTransactionTask != null) { + executeOnSuccess(nextTransactionTask.getOnStart(), nextTransactionTask.getMsg()); + } + } + } finally { + transactionLock.unlock(); } } else { try { - log.trace("Task has not expired! Continue executing...[{}] [{}]", task.getMsg().getId(), task.getMsg().getType()); + log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType()); TimeUnit.MILLISECONDS.sleep(duration); } catch (InterruptedException e) { throw new IllegalStateException("Thread interrupted", e); @@ -189,10 +202,22 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ }); } - private void startTransactionTask(TbTransactionTask task) { - task.setIsCompleted(false); - task.setExpirationTime(System.currentTimeMillis() + duration); - task.getOnStart().accept(task.getMsg()); + private void executeOnFailure(Consumer onFailure, String exception) { + executeCallback(() -> { + onFailure.accept(new RuntimeException(exception)); + return null; + }); + } + + private void executeOnSuccess(Consumer onSuccess, TbMsg tbMsg) { + executeCallback(() -> { + onSuccess.accept(tbMsg); + return null; + }); + } + + private void executeCallback(Callable task) { + callbackExecutor.executeAsync(task); } @Override @@ -204,25 +229,21 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ throw new RuntimeException(e); } TenantId tenantId = new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); - - String entityTypeStr = proto.getEntityType(); - EntityId entityId; - if (entityTypeStr.equals(EntityType.ASSET.name())) { - entityId = new AssetId(new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB())); - } else { - entityId = new DeviceId(new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB())); - } + EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB())); onTransactionEnd(tenantId, entityId); } - @Override - public void onTransactionEnd(TenantId tenantId, EntityId entityId) { - executor.submit(() -> onTransactionEndSync(tenantId, entityId)); + private void onTransactionEnd(TenantId tenantId, EntityId entityId) { + callbackExecutor.executeAsync(() -> onRemoteTransactionEndSync(tenantId, entityId)); } - private void onTransactionEndSync(TenantId tenantId, EntityId entityId) { + private boolean onRemoteTransactionEndSync(TenantId tenantId, EntityId entityId) { Optional address = routingService.resolveById(entityId); - address.ifPresent(serverAddress -> sendTransactionEvent(tenantId, entityId, serverAddress)); + if (address.isPresent()) { + sendTransactionEvent(tenantId, entityId, address.get()); + return true; + } + return false; } private void sendTransactionEvent(TenantId tenantId, EntityId entityId, ServerAddress address) { diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java b/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java index a56bc0557c..49e29bac16 100644 --- a/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java +++ b/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java @@ -29,15 +29,16 @@ public final class TbTransactionTask { private final Consumer onStart; private final Consumer onEnd; private final Consumer onFailure; + private final long expirationTime; - private Boolean isCompleted; - private Long expirationTime; + private boolean isCompleted; - public TbTransactionTask(TbMsg msg, Consumer onStart, Consumer onEnd, Consumer onFailure) { + public TbTransactionTask(TbMsg msg, Consumer onStart, Consumer onEnd, Consumer onFailure, long expirationTime) { this.msg = msg; this.onStart = onStart; this.onEnd = onEnd; this.onFailure = onFailure; + this.expirationTime = expirationTime; this.isCompleted = false; } } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java index 51aebea1dd..dbfc0211cf 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java @@ -15,8 +15,6 @@ */ package org.thingsboard.rule.engine.api; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; @@ -26,10 +24,8 @@ public interface RuleChainTransactionService { void beginTransaction(TbContext ctx, TbMsg msg, Consumer onStart, Consumer onEnd, Consumer onFailure); - boolean endTransaction(TbContext ctx, TbMsg msg, Consumer onFailure); + void endTransaction(TbContext ctx, TbMsg msg, Consumer onSuccess, Consumer onFailure); void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] bytes); - void onTransactionEnd(TenantId tenantId, EntityId entityId); - } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java index 2452f71f5e..b086140dd3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java @@ -28,7 +28,6 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgTransactionData; -import java.util.UUID; import java.util.concurrent.ExecutionException; import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; @@ -54,19 +53,18 @@ public class TbTransactionBeginNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { - log.trace("Msg enters transaction - [{}] [{}]", msg.getId(), msg.getType()); + log.trace("Msg enters transaction - [{}][{}]", msg.getId(), msg.getType()); TbMsgTransactionData transactionData = new TbMsgTransactionData(msg.getId(), msg.getOriginator()); - TbMsg tbMsg = new TbMsg(msg.getId(), msg.getType(), msg.getOriginator(), msg.getMetaData(), TbMsgDataType.JSON, msg.getData(), transactionData, msg.getRuleChainId(), msg.getRuleNodeId(), msg.getClusterPartition()); - ctx.getRuleChainTransactionService().beginTransaction(ctx, tbMsg, onStart -> { - log.trace("Transaction starting... [{}] [{}]", tbMsg.getId(), tbMsg.getType()); - ctx.tellNext(tbMsg, SUCCESS); - }, onEnd -> log.trace("Transaction ended successfully... [{}] [{}]", tbMsg.getId(), tbMsg.getType()), + ctx.getRuleChainTransactionService().beginTransaction(ctx, tbMsg, startMsg -> { + log.trace("Transaction starting... [{}][{}]", startMsg.getId(), startMsg.getType()); + ctx.tellNext(startMsg, SUCCESS); + }, endMsg -> log.trace("Transaction ended successfully... [{}][{}]", endMsg.getId(), endMsg.getType()), throwable -> { - log.error("Transaction failed! [{}] [{}]", tbMsg.getId(), tbMsg.getType(), throwable); + log.error("Transaction failed! [{}][{}]", tbMsg.getId(), tbMsg.getType(), throwable); ctx.tellFailure(tbMsg, throwable); }); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java index dc6ce6e914..ea98bccc9e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java @@ -51,12 +51,10 @@ public class TbTransactionEndNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { - ctx.getRuleChainTransactionService().onTransactionEnd(ctx.getTenantId(), msg.getTransactionData().getOriginatorId()); - boolean isFailed = ctx.getRuleChainTransactionService().endTransaction(ctx, msg, throwable -> ctx.tellFailure(msg, throwable)); - if (!isFailed) { - ctx.tellNext(msg, SUCCESS); - } - log.trace("Msg left transaction - [{}] [{}]", msg.getId(), msg.getType()); + ctx.getRuleChainTransactionService().endTransaction(ctx, msg, + successMsg -> ctx.tellNext(successMsg, SUCCESS), + throwable -> ctx.tellFailure(msg, throwable)); + log.trace("Msg left transaction - [{}][{}]", msg.getId(), msg.getType()); } @Override