diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 6ed400a6ef..a2e0aedbcf 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -34,6 +34,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import org.thingsboard.rule.engine.api.MailService; +import org.thingsboard.rule.engine.api.RuleChainTransactionService; import org.thingsboard.server.actors.service.ActorService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Event; @@ -222,6 +223,11 @@ public class ActorSystemContext { @Getter private RuleEngineTransportService ruleEngineTransportService; + @Lazy + @Autowired + @Getter + private RuleChainTransactionService ruleChainTransactionService; + @Value("${cluster.partition_id}") @Getter private long queuePartitionId; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 27a766e68c..78dc1f51b8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -20,6 +20,7 @@ import com.datastax.driver.core.utils.UUIDs; import org.springframework.util.StringUtils; import org.thingsboard.rule.engine.api.ListeningExecutor; import org.thingsboard.rule.engine.api.MailService; +import org.thingsboard.rule.engine.api.RuleChainTransactionService; import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest; import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse; import org.thingsboard.rule.engine.api.RuleEngineRpcService; @@ -124,7 +125,7 @@ class DefaultTbContext implements TbContext { @Override public TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) { - return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), mainCtx.getQueuePartitionId()); + return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), origMsg.getDataType(), data, origMsg.getTransactionData(), origMsg.getRuleChainId(), origMsg.getRuleNodeId(), mainCtx.getQueuePartitionId()); } @Override @@ -232,6 +233,11 @@ class DefaultTbContext implements TbContext { return mainCtx.getEntityViewService(); } + @Override + public RuleChainTransactionService getRuleChainTransactionService() { + return mainCtx.getRuleChainTransactionService(); + } + @Override public MailService getMailService() { if (mainCtx.isAllowSystemMailService()) { diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java index 50949c1578..e32e62227d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java @@ -56,12 +56,8 @@ import scala.concurrent.duration.Duration; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; - -import java.util.Arrays; -import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE; @@ -235,6 +231,9 @@ public class DefaultActorService implements ActorService { case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE: actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray()); break; + case CLUSTER_TRANSACTION_SERVICE_MESSAGE: + actorContext.getRuleChainTransactionService().onRemoteTransactionMsg(serverAddress, msg.getPayload().toByteArray()); + break; } } diff --git a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java index dab9e69f6e..28c92eb46b 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DeviceStateService.java @@ -19,8 +19,6 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.msg.cluster.ServerAddress; -import java.util.Optional; - /** * Created by ashvayka on 01.05.18. */ diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java new file mode 100644 index 0000000000..d7aa7664b5 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/transaction/BaseRuleChainTransactionService.java @@ -0,0 +1,255 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.transaction; + +import com.google.protobuf.InvalidProtocolBufferException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.thingsboard.rule.engine.api.RuleChainTransactionService; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.gen.cluster.ClusterAPIProtos; +import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; +import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; +import org.thingsboard.server.service.executors.DbCallbackExecutorService; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.Optional; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +@Service +@Slf4j +public class BaseRuleChainTransactionService implements RuleChainTransactionService { + + @Autowired + private ClusterRoutingService routingService; + + @Autowired + private ClusterRpcService clusterRpcService; + + @Autowired + private DbCallbackExecutorService callbackExecutor; + + @Value("${actors.rule.transaction.queue_size}") + private int finalQueueSize; + @Value("${actors.rule.transaction.duration}") + private long duration; + + private final Lock transactionLock = new ReentrantLock(); + private final ConcurrentMap> transactionMap = new ConcurrentHashMap<>(); + private final Queue timeoutQueue = new ConcurrentLinkedQueue<>(); + + private ExecutorService timeoutExecutor; + + @PostConstruct + public void init() { + timeoutExecutor = Executors.newSingleThreadExecutor(); + executeOnTimeout(); + } + + @PreDestroy + public void destroy() { + if (timeoutExecutor != null) { + timeoutExecutor.shutdownNow(); + } + } + + @Override + public void beginTransaction(TbMsg msg, Consumer onStart, Consumer onEnd, Consumer onFailure) { + transactionLock.lock(); + try { + BlockingQueue queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id -> + new LinkedBlockingQueue<>(finalQueueSize)); + + TbTransactionTask transactionTask = new TbTransactionTask(msg, onStart, onEnd, onFailure, System.currentTimeMillis() + duration); + int queueSize = queue.size(); + if (queueSize >= finalQueueSize) { + executeOnFailure(transactionTask.getOnFailure(), "Queue has no space!"); + } else { + addMsgToQueues(queue, transactionTask); + if (queueSize == 0) { + executeOnSuccess(transactionTask.getOnStart(), transactionTask.getMsg()); + } else { + log.trace("Msg [{}][{}] is waiting to start transaction!", msg.getId(), msg.getType()); + } + } + } finally { + transactionLock.unlock(); + } + } + + @Override + public void endTransaction(TbMsg msg, Consumer onSuccess, Consumer onFailure) { + EntityId originatorId = msg.getTransactionData().getOriginatorId(); + UUID transactionId = msg.getTransactionData().getTransactionId(); + + Optional address = routingService.resolveById(originatorId); + if (address.isPresent()) { + sendTransactionEventToRemoteServer(originatorId, transactionId, address.get()); + executeOnSuccess(onSuccess, msg); + } else { + endLocalTransaction(transactionId, originatorId, onSuccess, onFailure); + } + } + + @Override + public void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] data) { + ClusterAPIProtos.TransactionEndServiceMsgProto proto; + try { + proto = ClusterAPIProtos.TransactionEndServiceMsgProto.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + EntityId originatorId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB())); + UUID transactionId = new UUID(proto.getTransactionIdMSB(), proto.getTransactionIdLSB()); + endLocalTransaction(transactionId, originatorId, msg -> { + }, error -> { + }); + } + + private void addMsgToQueues(BlockingQueue queue, TbTransactionTask transactionTask) { + queue.offer(transactionTask); + timeoutQueue.offer(transactionTask); + log.trace("Added msg to queue, size: [{}]", queue.size()); + } + + private void endLocalTransaction(UUID transactionId, EntityId originatorId, Consumer onSuccess, Consumer onFailure) { + transactionLock.lock(); + try { + BlockingQueue queue = transactionMap.computeIfAbsent(originatorId, id -> + new LinkedBlockingQueue<>(finalQueueSize)); + + TbTransactionTask currentTransactionTask = queue.peek(); + if (currentTransactionTask != null) { + if (currentTransactionTask.getMsg().getTransactionData().getTransactionId().equals(transactionId)) { + currentTransactionTask.setCompleted(true); + queue.poll(); + log.trace("Removed msg from queue, size [{}]", queue.size()); + + executeOnSuccess(currentTransactionTask.getOnEnd(), currentTransactionTask.getMsg()); + executeOnSuccess(onSuccess, currentTransactionTask.getMsg()); + + TbTransactionTask nextTransactionTask = queue.peek(); + if (nextTransactionTask != null) { + executeOnSuccess(nextTransactionTask.getOnStart(), nextTransactionTask.getMsg()); + } + } else { + log.trace("Task has expired!"); + executeOnFailure(onFailure, "Task has expired!"); + } + } else { + log.trace("Queue is empty, previous task has expired!"); + executeOnFailure(onFailure, "Queue is empty, previous task has expired!"); + } + } finally { + transactionLock.unlock(); + } + } + + private void executeOnTimeout() { + timeoutExecutor.submit(() -> { + while (true) { + TbTransactionTask transactionTask = timeoutQueue.peek(); + if (transactionTask != null) { + transactionLock.lock(); + try { + if (transactionTask.isCompleted()) { + timeoutQueue.poll(); + } else { + if (System.currentTimeMillis() > transactionTask.getExpirationTime()) { + log.trace("Task has expired! Deleting it...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType()); + timeoutQueue.poll(); + executeOnFailure(transactionTask.getOnFailure(), "Task has expired!"); + + BlockingQueue queue = transactionMap.get(transactionTask.getMsg().getTransactionData().getOriginatorId()); + if (queue != null) { + queue.poll(); + TbTransactionTask nextTransactionTask = queue.peek(); + if (nextTransactionTask != null) { + executeOnSuccess(nextTransactionTask.getOnStart(), nextTransactionTask.getMsg()); + } + } + } else { + try { + log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType()); + TimeUnit.MILLISECONDS.sleep(duration); + } catch (InterruptedException e) { + throw new IllegalStateException("Thread interrupted", e); + } + } + } + } finally { + transactionLock.unlock(); + } + } else { + try { + log.trace("Queue is empty, waiting for tasks!"); + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + throw new IllegalStateException("Thread interrupted", e); + } + } + } + }); + } + + private void executeOnFailure(Consumer onFailure, String exception) { + executeCallback(() -> { + onFailure.accept(new RuntimeException(exception)); + return null; + }); + } + + private void executeOnSuccess(Consumer onSuccess, TbMsg tbMsg) { + executeCallback(() -> { + onSuccess.accept(tbMsg); + return null; + }); + } + + private void executeCallback(Callable task) { + callbackExecutor.executeAsync(task); + } + + private void sendTransactionEventToRemoteServer(EntityId entityId, UUID transactionId, ServerAddress address) { + log.trace("[{}][{}] Originator is monitored on other server: {}", entityId, transactionId, address); + ClusterAPIProtos.TransactionEndServiceMsgProto.Builder builder = ClusterAPIProtos.TransactionEndServiceMsgProto.newBuilder(); + builder.setEntityType(entityId.getEntityType().name()); + builder.setOriginatorIdMSB(entityId.getId().getMostSignificantBits()); + builder.setOriginatorIdLSB(entityId.getId().getLeastSignificantBits()); + builder.setTransactionIdMSB(transactionId.getMostSignificantBits()); + builder.setTransactionIdLSB(transactionId.getLeastSignificantBits()); + clusterRpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TRANSACTION_SERVICE_MESSAGE, builder.build().toByteArray()); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java b/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java new file mode 100644 index 0000000000..49e29bac16 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/transaction/TbTransactionTask.java @@ -0,0 +1,44 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.transaction; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.thingsboard.server.common.msg.TbMsg; + +import java.util.function.Consumer; + +@Data +@AllArgsConstructor +public final class TbTransactionTask { + + private final TbMsg msg; + private final Consumer onStart; + private final Consumer onEnd; + private final Consumer onFailure; + private final long expirationTime; + + private boolean isCompleted; + + public TbTransactionTask(TbMsg msg, Consumer onStart, Consumer onEnd, Consumer onFailure, long expirationTime) { + this.msg = msg; + this.onStart = onStart; + this.onEnd = onEnd; + this.onFailure = onFailure; + this.expirationTime = expirationTime; + this.isCompleted = false; + } +} diff --git a/application/src/main/proto/cluster.proto b/application/src/main/proto/cluster.proto index 01e932369b..4200feea91 100644 --- a/application/src/main/proto/cluster.proto +++ b/application/src/main/proto/cluster.proto @@ -60,6 +60,7 @@ enum MessageType { CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE = 12; CLUSTER_DEVICE_STATE_SERVICE_MESSAGE = 13; + CLUSTER_TRANSACTION_SERVICE_MESSAGE = 14; } // Messages related to CLUSTER_TELEMETRY_MESSAGE @@ -142,3 +143,11 @@ message DeviceStateServiceMsgProto { bool updated = 6; bool deleted = 7; } + +message TransactionEndServiceMsgProto { + string entityType = 1; + int64 originatorIdMSB = 2; + int64 originatorIdLSB = 3; + int64 transactionIdMSB = 4; + int64 transactionIdLSB = 5; +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 0f87429bb2..85b8f0aa83 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -213,6 +213,11 @@ actors: node: # Errors for particular actor are persisted once per specified amount of milliseconds error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}" + transaction: + # Size of queues which store messages for transaction rule nodes + queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:20}" + # Time in milliseconds for transaction to complete + duration: "${ACTORS_RULE_TRANSACTION_DURATION:15000}" statistics: # Enable/disable actor statistics enabled: "${ACTORS_STATISTICS_ENABLED:true}" @@ -310,7 +315,7 @@ spring: password: "${SPRING_DATASOURCE_PASSWORD:}" # PostgreSQL DAO Configuration -# spring: +#spring: # data: # sql: # repositories: diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index a6eb64e544..325f8eeb7b 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -41,6 +41,7 @@ public final class TbMsg implements Serializable { private final TbMsgMetaData metaData; private final TbMsgDataType dataType; private final String data; + private final TbMsgTransactionData transactionData; //The following fields are not persisted to DB, because they can always be recovered from the context; private final RuleChainId ruleChainId; @@ -55,11 +56,17 @@ public final class TbMsg implements Serializable { this.metaData = metaData; this.data = data; this.dataType = TbMsgDataType.JSON; + this.transactionData = new TbMsgTransactionData(id, originator); this.ruleChainId = ruleChainId; this.ruleNodeId = ruleNodeId; this.clusterPartition = clusterPartition; } + public TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, + RuleChainId ruleChainId, RuleNodeId ruleNodeId, long clusterPartition) { + this(id, type, originator, metaData, dataType, data, new TbMsgTransactionData(id, originator), ruleChainId, ruleNodeId, clusterPartition); + } + public static ByteBuffer toBytes(TbMsg msg) { MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder(); builder.setId(msg.getId().toString()); @@ -82,6 +89,16 @@ public final class TbMsg implements Serializable { builder.setMetaData(MsgProtos.TbMsgMetaDataProto.newBuilder().putAllData(msg.getMetaData().getData()).build()); } + TbMsgTransactionData transactionData = msg.getTransactionData(); + if (transactionData != null) { + MsgProtos.TbMsgTransactionDataProto.Builder transactionBuilder = MsgProtos.TbMsgTransactionDataProto.newBuilder(); + transactionBuilder.setId(transactionData.getTransactionId().toString()); + transactionBuilder.setEntityType(transactionData.getOriginatorId().getEntityType().name()); + transactionBuilder.setEntityIdMSB(transactionData.getOriginatorId().getId().getMostSignificantBits()); + transactionBuilder.setEntityIdLSB(transactionData.getOriginatorId().getId().getLeastSignificantBits()); + builder.setTransactionData(transactionBuilder.build()); + } + builder.setDataType(msg.getDataType().ordinal()); builder.setData(msg.getData()); byte[] bytes = builder.build().toByteArray(); @@ -92,6 +109,9 @@ public final class TbMsg implements Serializable { try { MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array()); TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap()); + EntityId transactionEntityId = EntityIdFactory.getByTypeAndUuid(proto.getTransactionData().getEntityType(), + new UUID(proto.getTransactionData().getEntityIdMSB(), proto.getTransactionData().getEntityIdLSB())); + TbMsgTransactionData transactionData = new TbMsgTransactionData(UUID.fromString(proto.getTransactionData().getId()), transactionEntityId); EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); RuleChainId ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB())); RuleNodeId ruleNodeId = null; @@ -99,7 +119,7 @@ public final class TbMsg implements Serializable { ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB())); } TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; - return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, proto.getClusterPartition()); + return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), transactionData, ruleChainId, ruleNodeId, proto.getClusterPartition()); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException("Could not parse protobuf for TbMsg", e); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgTransactionData.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgTransactionData.java new file mode 100644 index 0000000000..466f9ce4af --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgTransactionData.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg; + +import lombok.Data; +import org.thingsboard.server.common.data.id.EntityId; + +import java.io.Serializable; +import java.util.UUID; + +@Data +public final class TbMsgTransactionData implements Serializable { + + private final UUID transactionId; + private final EntityId originatorId; + +} diff --git a/common/message/src/main/proto/tbmsg.proto b/common/message/src/main/proto/tbmsg.proto index 60003dc420..8bd0e4b6bd 100644 --- a/common/message/src/main/proto/tbmsg.proto +++ b/common/message/src/main/proto/tbmsg.proto @@ -23,6 +23,13 @@ message TbMsgMetaDataProto { map data = 1; } +message TbMsgTransactionDataProto { + string id = 1; + string entityType = 2; + int64 entityIdMSB = 3; + int64 entityIdLSB = 4; +} + message TbMsgProto { string id = 1; string type = 2; @@ -39,7 +46,9 @@ message TbMsgProto { TbMsgMetaDataProto metaData = 11; - int32 dataType = 12; - string data = 13; + TbMsgTransactionDataProto transactionData = 12; + + int32 dataType = 13; + string data = 14; } \ No newline at end of file diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java new file mode 100644 index 0000000000..920af74854 --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleChainTransactionService.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.cluster.ServerAddress; + +import java.util.function.Consumer; + +public interface RuleChainTransactionService { + + void beginTransaction(TbMsg msg, Consumer onStart, Consumer onEnd, Consumer onFailure); + + void endTransaction(TbMsg msg, Consumer onSuccess, Consumer onFailure); + + void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] bytes); + +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 37c4a51483..29e7b262dd 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -103,4 +103,6 @@ public interface TbContext { ScriptEngine createJsScriptEngine(String script, String... argNames); String getNodeId(); + + RuleChainTransactionService getRuleChainTransactionService(); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java new file mode 100644 index 0000000000..75175ef7fa --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionBeginNode.java @@ -0,0 +1,75 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transaction; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgDataType; +import org.thingsboard.server.common.msg.TbMsgTransactionData; + +import java.util.concurrent.ExecutionException; + +import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; + +@Slf4j +@RuleNode( + type = ComponentType.ACTION, + name = "transaction start", + configClazz = EmptyNodeConfiguration.class, + nodeDescription = "", + nodeDetails = "", + uiResources = {"static/rulenode/rulenode-core-config.js"}, + configDirective = "tbNodeEmptyConfig") +public class TbTransactionBeginNode implements TbNode { + + private EmptyNodeConfiguration config; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { + log.trace("Msg enters transaction - [{}][{}]", msg.getId(), msg.getType()); + + TbMsgTransactionData transactionData = new TbMsgTransactionData(msg.getId(), msg.getOriginator()); + TbMsg tbMsg = new TbMsg(msg.getId(), msg.getType(), msg.getOriginator(), msg.getMetaData(), TbMsgDataType.JSON, + msg.getData(), transactionData, msg.getRuleChainId(), msg.getRuleNodeId(), msg.getClusterPartition()); + + ctx.getRuleChainTransactionService().beginTransaction(tbMsg, startMsg -> { + log.trace("Transaction starting...[{}][{}]", startMsg.getId(), startMsg.getType()); + ctx.tellNext(startMsg, SUCCESS); + }, endMsg -> log.trace("Transaction ended successfully...[{}][{}]", endMsg.getId(), endMsg.getType()), + throwable -> { + log.error("Transaction failed! [{}][{}]", tbMsg.getId(), tbMsg.getType(), throwable); + ctx.tellFailure(tbMsg, throwable); + }); + } + + @Override + public void destroy() { + + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java new file mode 100644 index 0000000000..a51d97e32a --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transaction/TbTransactionEndNode.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.transaction; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; + +import java.util.concurrent.ExecutionException; + +import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; + +@Slf4j +@RuleNode( + type = ComponentType.ACTION, + name = "transaction end", + configClazz = EmptyNodeConfiguration.class, + nodeDescription = "", + nodeDetails = "", + uiResources = {"static/rulenode/rulenode-core-config.js"}, + configDirective = ("tbNodeEmptyConfig") +) +public class TbTransactionEndNode implements TbNode { + + private EmptyNodeConfiguration config; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { + ctx.getRuleChainTransactionService().endTransaction(msg, + successMsg -> ctx.tellNext(successMsg, SUCCESS), + throwable -> ctx.tellFailure(msg, throwable)); + log.trace("Msg left transaction - [{}][{}]", msg.getId(), msg.getType()); + } + + @Override + public void destroy() { + + } +}