Merge pull request #1301 from dmytro-landiak/transaction/development

Transaction service and rule nodes
This commit is contained in:
Andrew Shvayka 2018-12-08 15:19:17 +02:00 committed by GitHub
commit 9629cd0ca1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 563 additions and 10 deletions

View File

@ -34,6 +34,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.RuleChainTransactionService;
import org.thingsboard.server.actors.service.ActorService; import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Event; import org.thingsboard.server.common.data.Event;
@ -222,6 +223,11 @@ public class ActorSystemContext {
@Getter @Getter
private RuleEngineTransportService ruleEngineTransportService; private RuleEngineTransportService ruleEngineTransportService;
@Lazy
@Autowired
@Getter
private RuleChainTransactionService ruleChainTransactionService;
@Value("${cluster.partition_id}") @Value("${cluster.partition_id}")
@Getter @Getter
private long queuePartitionId; private long queuePartitionId;

View File

@ -20,6 +20,7 @@ import com.datastax.driver.core.utils.UUIDs;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.thingsboard.rule.engine.api.ListeningExecutor; import org.thingsboard.rule.engine.api.ListeningExecutor;
import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.RuleChainTransactionService;
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest; import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse; import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse;
import org.thingsboard.rule.engine.api.RuleEngineRpcService; import org.thingsboard.rule.engine.api.RuleEngineRpcService;
@ -124,7 +125,7 @@ class DefaultTbContext implements TbContext {
@Override @Override
public TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) { public TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), mainCtx.getQueuePartitionId()); return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), origMsg.getDataType(), data, origMsg.getTransactionData(), origMsg.getRuleChainId(), origMsg.getRuleNodeId(), mainCtx.getQueuePartitionId());
} }
@Override @Override
@ -232,6 +233,11 @@ class DefaultTbContext implements TbContext {
return mainCtx.getEntityViewService(); return mainCtx.getEntityViewService();
} }
@Override
public RuleChainTransactionService getRuleChainTransactionService() {
return mainCtx.getRuleChainTransactionService();
}
@Override @Override
public MailService getMailService() { public MailService getMailService() {
if (mainCtx.isAllowSystemMailService()) { if (mainCtx.isAllowSystemMailService()) {

View File

@ -56,12 +56,8 @@ import scala.concurrent.duration.Duration;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE; import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE;
@ -235,6 +231,9 @@ public class DefaultActorService implements ActorService {
case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE: case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE:
actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray()); actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray());
break; break;
case CLUSTER_TRANSACTION_SERVICE_MESSAGE:
actorContext.getRuleChainTransactionService().onRemoteTransactionMsg(serverAddress, msg.getPayload().toByteArray());
break;
} }
} }

View File

@ -19,8 +19,6 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.cluster.ServerAddress;
import java.util.Optional;
/** /**
* Created by ashvayka on 01.05.18. * Created by ashvayka on 01.05.18.
*/ */

View File

@ -0,0 +1,255 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.transaction;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.RuleChainTransactionService;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@Service
@Slf4j
public class BaseRuleChainTransactionService implements RuleChainTransactionService {
@Autowired
private ClusterRoutingService routingService;
@Autowired
private ClusterRpcService clusterRpcService;
@Autowired
private DbCallbackExecutorService callbackExecutor;
@Value("${actors.rule.transaction.queue_size}")
private int finalQueueSize;
@Value("${actors.rule.transaction.duration}")
private long duration;
private final Lock transactionLock = new ReentrantLock();
private final ConcurrentMap<EntityId, BlockingQueue<TbTransactionTask>> transactionMap = new ConcurrentHashMap<>();
private final Queue<TbTransactionTask> timeoutQueue = new ConcurrentLinkedQueue<>();
private ExecutorService timeoutExecutor;
@PostConstruct
public void init() {
timeoutExecutor = Executors.newSingleThreadExecutor();
executeOnTimeout();
}
@PreDestroy
public void destroy() {
if (timeoutExecutor != null) {
timeoutExecutor.shutdownNow();
}
}
@Override
public void beginTransaction(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure) {
transactionLock.lock();
try {
BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id ->
new LinkedBlockingQueue<>(finalQueueSize));
TbTransactionTask transactionTask = new TbTransactionTask(msg, onStart, onEnd, onFailure, System.currentTimeMillis() + duration);
int queueSize = queue.size();
if (queueSize >= finalQueueSize) {
executeOnFailure(transactionTask.getOnFailure(), "Queue has no space!");
} else {
addMsgToQueues(queue, transactionTask);
if (queueSize == 0) {
executeOnSuccess(transactionTask.getOnStart(), transactionTask.getMsg());
} else {
log.trace("Msg [{}][{}] is waiting to start transaction!", msg.getId(), msg.getType());
}
}
} finally {
transactionLock.unlock();
}
}
@Override
public void endTransaction(TbMsg msg, Consumer<TbMsg> onSuccess, Consumer<Throwable> onFailure) {
EntityId originatorId = msg.getTransactionData().getOriginatorId();
UUID transactionId = msg.getTransactionData().getTransactionId();
Optional<ServerAddress> address = routingService.resolveById(originatorId);
if (address.isPresent()) {
sendTransactionEventToRemoteServer(originatorId, transactionId, address.get());
executeOnSuccess(onSuccess, msg);
} else {
endLocalTransaction(transactionId, originatorId, onSuccess, onFailure);
}
}
@Override
public void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] data) {
ClusterAPIProtos.TransactionEndServiceMsgProto proto;
try {
proto = ClusterAPIProtos.TransactionEndServiceMsgProto.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
EntityId originatorId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getOriginatorIdMSB(), proto.getOriginatorIdLSB()));
UUID transactionId = new UUID(proto.getTransactionIdMSB(), proto.getTransactionIdLSB());
endLocalTransaction(transactionId, originatorId, msg -> {
}, error -> {
});
}
private void addMsgToQueues(BlockingQueue<TbTransactionTask> queue, TbTransactionTask transactionTask) {
queue.offer(transactionTask);
timeoutQueue.offer(transactionTask);
log.trace("Added msg to queue, size: [{}]", queue.size());
}
private void endLocalTransaction(UUID transactionId, EntityId originatorId, Consumer<TbMsg> onSuccess, Consumer<Throwable> onFailure) {
transactionLock.lock();
try {
BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(originatorId, id ->
new LinkedBlockingQueue<>(finalQueueSize));
TbTransactionTask currentTransactionTask = queue.peek();
if (currentTransactionTask != null) {
if (currentTransactionTask.getMsg().getTransactionData().getTransactionId().equals(transactionId)) {
currentTransactionTask.setCompleted(true);
queue.poll();
log.trace("Removed msg from queue, size [{}]", queue.size());
executeOnSuccess(currentTransactionTask.getOnEnd(), currentTransactionTask.getMsg());
executeOnSuccess(onSuccess, currentTransactionTask.getMsg());
TbTransactionTask nextTransactionTask = queue.peek();
if (nextTransactionTask != null) {
executeOnSuccess(nextTransactionTask.getOnStart(), nextTransactionTask.getMsg());
}
} else {
log.trace("Task has expired!");
executeOnFailure(onFailure, "Task has expired!");
}
} else {
log.trace("Queue is empty, previous task has expired!");
executeOnFailure(onFailure, "Queue is empty, previous task has expired!");
}
} finally {
transactionLock.unlock();
}
}
private void executeOnTimeout() {
timeoutExecutor.submit(() -> {
while (true) {
TbTransactionTask transactionTask = timeoutQueue.peek();
if (transactionTask != null) {
transactionLock.lock();
try {
if (transactionTask.isCompleted()) {
timeoutQueue.poll();
} else {
if (System.currentTimeMillis() > transactionTask.getExpirationTime()) {
log.trace("Task has expired! Deleting it...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
timeoutQueue.poll();
executeOnFailure(transactionTask.getOnFailure(), "Task has expired!");
BlockingQueue<TbTransactionTask> queue = transactionMap.get(transactionTask.getMsg().getTransactionData().getOriginatorId());
if (queue != null) {
queue.poll();
TbTransactionTask nextTransactionTask = queue.peek();
if (nextTransactionTask != null) {
executeOnSuccess(nextTransactionTask.getOnStart(), nextTransactionTask.getMsg());
}
}
} else {
try {
log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
TimeUnit.MILLISECONDS.sleep(duration);
} catch (InterruptedException e) {
throw new IllegalStateException("Thread interrupted", e);
}
}
}
} finally {
transactionLock.unlock();
}
} else {
try {
log.trace("Queue is empty, waiting for tasks!");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException("Thread interrupted", e);
}
}
}
});
}
private void executeOnFailure(Consumer<Throwable> onFailure, String exception) {
executeCallback(() -> {
onFailure.accept(new RuntimeException(exception));
return null;
});
}
private void executeOnSuccess(Consumer<TbMsg> onSuccess, TbMsg tbMsg) {
executeCallback(() -> {
onSuccess.accept(tbMsg);
return null;
});
}
private void executeCallback(Callable<Void> task) {
callbackExecutor.executeAsync(task);
}
private void sendTransactionEventToRemoteServer(EntityId entityId, UUID transactionId, ServerAddress address) {
log.trace("[{}][{}] Originator is monitored on other server: {}", entityId, transactionId, address);
ClusterAPIProtos.TransactionEndServiceMsgProto.Builder builder = ClusterAPIProtos.TransactionEndServiceMsgProto.newBuilder();
builder.setEntityType(entityId.getEntityType().name());
builder.setOriginatorIdMSB(entityId.getId().getMostSignificantBits());
builder.setOriginatorIdLSB(entityId.getId().getLeastSignificantBits());
builder.setTransactionIdMSB(transactionId.getMostSignificantBits());
builder.setTransactionIdLSB(transactionId.getLeastSignificantBits());
clusterRpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TRANSACTION_SERVICE_MESSAGE, builder.build().toByteArray());
}
}

View File

@ -0,0 +1,44 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.transaction;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.function.Consumer;
@Data
@AllArgsConstructor
public final class TbTransactionTask {
private final TbMsg msg;
private final Consumer<TbMsg> onStart;
private final Consumer<TbMsg> onEnd;
private final Consumer<Throwable> onFailure;
private final long expirationTime;
private boolean isCompleted;
public TbTransactionTask(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure, long expirationTime) {
this.msg = msg;
this.onStart = onStart;
this.onEnd = onEnd;
this.onFailure = onFailure;
this.expirationTime = expirationTime;
this.isCompleted = false;
}
}

View File

@ -60,6 +60,7 @@ enum MessageType {
CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE = 12; CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE = 12;
CLUSTER_DEVICE_STATE_SERVICE_MESSAGE = 13; CLUSTER_DEVICE_STATE_SERVICE_MESSAGE = 13;
CLUSTER_TRANSACTION_SERVICE_MESSAGE = 14;
} }
// Messages related to CLUSTER_TELEMETRY_MESSAGE // Messages related to CLUSTER_TELEMETRY_MESSAGE
@ -142,3 +143,11 @@ message DeviceStateServiceMsgProto {
bool updated = 6; bool updated = 6;
bool deleted = 7; bool deleted = 7;
} }
message TransactionEndServiceMsgProto {
string entityType = 1;
int64 originatorIdMSB = 2;
int64 originatorIdLSB = 3;
int64 transactionIdMSB = 4;
int64 transactionIdLSB = 5;
}

View File

@ -213,6 +213,11 @@ actors:
node: node:
# Errors for particular actor are persisted once per specified amount of milliseconds # Errors for particular actor are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}" error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
transaction:
# Size of queues which store messages for transaction rule nodes
queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:20}"
# Time in milliseconds for transaction to complete
duration: "${ACTORS_RULE_TRANSACTION_DURATION:15000}"
statistics: statistics:
# Enable/disable actor statistics # Enable/disable actor statistics
enabled: "${ACTORS_STATISTICS_ENABLED:true}" enabled: "${ACTORS_STATISTICS_ENABLED:true}"

View File

@ -41,6 +41,7 @@ public final class TbMsg implements Serializable {
private final TbMsgMetaData metaData; private final TbMsgMetaData metaData;
private final TbMsgDataType dataType; private final TbMsgDataType dataType;
private final String data; private final String data;
private final TbMsgTransactionData transactionData;
//The following fields are not persisted to DB, because they can always be recovered from the context; //The following fields are not persisted to DB, because they can always be recovered from the context;
private final RuleChainId ruleChainId; private final RuleChainId ruleChainId;
@ -55,11 +56,17 @@ public final class TbMsg implements Serializable {
this.metaData = metaData; this.metaData = metaData;
this.data = data; this.data = data;
this.dataType = TbMsgDataType.JSON; this.dataType = TbMsgDataType.JSON;
this.transactionData = new TbMsgTransactionData(id, originator);
this.ruleChainId = ruleChainId; this.ruleChainId = ruleChainId;
this.ruleNodeId = ruleNodeId; this.ruleNodeId = ruleNodeId;
this.clusterPartition = clusterPartition; this.clusterPartition = clusterPartition;
} }
public TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
RuleChainId ruleChainId, RuleNodeId ruleNodeId, long clusterPartition) {
this(id, type, originator, metaData, dataType, data, new TbMsgTransactionData(id, originator), ruleChainId, ruleNodeId, clusterPartition);
}
public static ByteBuffer toBytes(TbMsg msg) { public static ByteBuffer toBytes(TbMsg msg) {
MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder(); MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder();
builder.setId(msg.getId().toString()); builder.setId(msg.getId().toString());
@ -82,6 +89,16 @@ public final class TbMsg implements Serializable {
builder.setMetaData(MsgProtos.TbMsgMetaDataProto.newBuilder().putAllData(msg.getMetaData().getData()).build()); builder.setMetaData(MsgProtos.TbMsgMetaDataProto.newBuilder().putAllData(msg.getMetaData().getData()).build());
} }
TbMsgTransactionData transactionData = msg.getTransactionData();
if (transactionData != null) {
MsgProtos.TbMsgTransactionDataProto.Builder transactionBuilder = MsgProtos.TbMsgTransactionDataProto.newBuilder();
transactionBuilder.setId(transactionData.getTransactionId().toString());
transactionBuilder.setEntityType(transactionData.getOriginatorId().getEntityType().name());
transactionBuilder.setEntityIdMSB(transactionData.getOriginatorId().getId().getMostSignificantBits());
transactionBuilder.setEntityIdLSB(transactionData.getOriginatorId().getId().getLeastSignificantBits());
builder.setTransactionData(transactionBuilder.build());
}
builder.setDataType(msg.getDataType().ordinal()); builder.setDataType(msg.getDataType().ordinal());
builder.setData(msg.getData()); builder.setData(msg.getData());
byte[] bytes = builder.build().toByteArray(); byte[] bytes = builder.build().toByteArray();
@ -92,6 +109,9 @@ public final class TbMsg implements Serializable {
try { try {
MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array()); MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array());
TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap()); TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
EntityId transactionEntityId = EntityIdFactory.getByTypeAndUuid(proto.getTransactionData().getEntityType(),
new UUID(proto.getTransactionData().getEntityIdMSB(), proto.getTransactionData().getEntityIdLSB()));
TbMsgTransactionData transactionData = new TbMsgTransactionData(UUID.fromString(proto.getTransactionData().getId()), transactionEntityId);
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
RuleChainId ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB())); RuleChainId ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB()));
RuleNodeId ruleNodeId = null; RuleNodeId ruleNodeId = null;
@ -99,7 +119,7 @@ public final class TbMsg implements Serializable {
ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB())); ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB()));
} }
TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, proto.getClusterPartition()); return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), transactionData, ruleChainId, ruleNodeId, proto.getClusterPartition());
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Could not parse protobuf for TbMsg", e); throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
} }

View File

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg;
import lombok.Data;
import org.thingsboard.server.common.data.id.EntityId;
import java.io.Serializable;
import java.util.UUID;
@Data
public final class TbMsgTransactionData implements Serializable {
private final UUID transactionId;
private final EntityId originatorId;
}

View File

@ -23,6 +23,13 @@ message TbMsgMetaDataProto {
map<string, string> data = 1; map<string, string> data = 1;
} }
message TbMsgTransactionDataProto {
string id = 1;
string entityType = 2;
int64 entityIdMSB = 3;
int64 entityIdLSB = 4;
}
message TbMsgProto { message TbMsgProto {
string id = 1; string id = 1;
string type = 2; string type = 2;
@ -39,7 +46,9 @@ message TbMsgProto {
TbMsgMetaDataProto metaData = 11; TbMsgMetaDataProto metaData = 11;
int32 dataType = 12; TbMsgTransactionDataProto transactionData = 12;
string data = 13;
int32 dataType = 13;
string data = 14;
} }

View File

@ -0,0 +1,31 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import java.util.function.Consumer;
public interface RuleChainTransactionService {
void beginTransaction(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure);
void endTransaction(TbMsg msg, Consumer<TbMsg> onSuccess, Consumer<Throwable> onFailure);
void onRemoteTransactionMsg(ServerAddress serverAddress, byte[] bytes);
}

View File

@ -103,4 +103,6 @@ public interface TbContext {
ScriptEngine createJsScriptEngine(String script, String... argNames); ScriptEngine createJsScriptEngine(String script, String... argNames);
String getNodeId(); String getNodeId();
RuleChainTransactionService getRuleChainTransactionService();
} }

View File

@ -0,0 +1,75 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.transaction;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgTransactionData;
import java.util.concurrent.ExecutionException;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "transaction start",
configClazz = EmptyNodeConfiguration.class,
nodeDescription = "",
nodeDetails = "",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbNodeEmptyConfig")
public class TbTransactionBeginNode implements TbNode {
private EmptyNodeConfiguration config;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
log.trace("Msg enters transaction - [{}][{}]", msg.getId(), msg.getType());
TbMsgTransactionData transactionData = new TbMsgTransactionData(msg.getId(), msg.getOriginator());
TbMsg tbMsg = new TbMsg(msg.getId(), msg.getType(), msg.getOriginator(), msg.getMetaData(), TbMsgDataType.JSON,
msg.getData(), transactionData, msg.getRuleChainId(), msg.getRuleNodeId(), msg.getClusterPartition());
ctx.getRuleChainTransactionService().beginTransaction(tbMsg, startMsg -> {
log.trace("Transaction starting...[{}][{}]", startMsg.getId(), startMsg.getType());
ctx.tellNext(startMsg, SUCCESS);
}, endMsg -> log.trace("Transaction ended successfully...[{}][{}]", endMsg.getId(), endMsg.getType()),
throwable -> {
log.error("Transaction failed! [{}][{}]", tbMsg.getId(), tbMsg.getType(), throwable);
ctx.tellFailure(tbMsg, throwable);
});
}
@Override
public void destroy() {
}
}

View File

@ -0,0 +1,64 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.transaction;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.concurrent.ExecutionException;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "transaction end",
configClazz = EmptyNodeConfiguration.class,
nodeDescription = "",
nodeDetails = "",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = ("tbNodeEmptyConfig")
)
public class TbTransactionEndNode implements TbNode {
private EmptyNodeConfiguration config;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
ctx.getRuleChainTransactionService().endTransaction(msg,
successMsg -> ctx.tellNext(successMsg, SUCCESS),
throwable -> ctx.tellFailure(msg, throwable));
log.trace("Msg left transaction - [{}][{}]", msg.getId(), msg.getType());
}
@Override
public void destroy() {
}
}