No more Synchronization nodes
This commit is contained in:
		
							parent
							
								
									63bac58ae8
								
							
						
					
					
						commit
						e991c0ef23
					
				@ -36,7 +36,6 @@ import org.springframework.data.redis.core.RedisTemplate;
 | 
			
		||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.rule.engine.api.MailService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleChainTransactionService;
 | 
			
		||||
import org.thingsboard.server.actors.service.ActorService;
 | 
			
		||||
import org.thingsboard.server.actors.tenant.DebugTbRateLimits;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
@ -67,7 +66,6 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
 | 
			
		||||
import org.thingsboard.server.dao.user.UserService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
 | 
			
		||||
import org.thingsboard.server.service.component.ComponentDiscoveryService;
 | 
			
		||||
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
 | 
			
		||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
 | 
			
		||||
@ -246,10 +244,6 @@ public class ActorSystemContext {
 | 
			
		||||
    @Getter
 | 
			
		||||
    private TbCoreDeviceRpcService tbCoreDeviceRpcService;
 | 
			
		||||
 | 
			
		||||
    @Autowired(required = false)
 | 
			
		||||
    @Getter
 | 
			
		||||
    private RuleChainTransactionService ruleChainTransactionService;
 | 
			
		||||
 | 
			
		||||
    @Value("${actors.session.max_concurrent_sessions_per_device:1}")
 | 
			
		||||
    @Getter
 | 
			
		||||
    private long maxConcurrentSessionsPerDevice;
 | 
			
		||||
 | 
			
		||||
@ -24,7 +24,6 @@ import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.data.redis.core.RedisTemplate;
 | 
			
		||||
import org.thingsboard.common.util.ListeningExecutor;
 | 
			
		||||
import org.thingsboard.rule.engine.api.MailService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleChainTransactionService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
 | 
			
		||||
import org.thingsboard.rule.engine.api.ScriptEngine;
 | 
			
		||||
@ -378,11 +377,6 @@ class DefaultTbContext implements TbContext {
 | 
			
		||||
        return mainCtx.getEntityViewService();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public RuleChainTransactionService getRuleChainTransactionService() {
 | 
			
		||||
        return mainCtx.getRuleChainTransactionService();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public EventLoopGroup getSharedEventLoop() {
 | 
			
		||||
        return mainCtx.getSharedEventLoopGroupService().getSharedEventLoopGroup();
 | 
			
		||||
 | 
			
		||||
@ -1,225 +0,0 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.transaction;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.rule.engine.api.RuleChainTransactionService;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
 | 
			
		||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PostConstruct;
 | 
			
		||||
import javax.annotation.PreDestroy;
 | 
			
		||||
import java.util.Queue;
 | 
			
		||||
import java.util.concurrent.BlockingQueue;
 | 
			
		||||
import java.util.concurrent.Callable;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentLinkedQueue;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
import java.util.concurrent.ExecutorService;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.LinkedBlockingQueue;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.locks.Lock;
 | 
			
		||||
import java.util.concurrent.locks.ReentrantLock;
 | 
			
		||||
import java.util.function.Consumer;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@TbRuleEngineComponent
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class BaseRuleChainTransactionService implements RuleChainTransactionService {
 | 
			
		||||
 | 
			
		||||
    private final DbCallbackExecutorService callbackExecutor;
 | 
			
		||||
 | 
			
		||||
    @Value("${actors.rule.transaction.queue_size}")
 | 
			
		||||
    private int finalQueueSize;
 | 
			
		||||
    @Value("${actors.rule.transaction.duration}")
 | 
			
		||||
    private long duration;
 | 
			
		||||
 | 
			
		||||
    private final Lock transactionLock = new ReentrantLock();
 | 
			
		||||
    private final ConcurrentMap<EntityId, BlockingQueue<TbTransactionTask>> transactionMap = new ConcurrentHashMap<>();
 | 
			
		||||
    private final Queue<TbTransactionTask> timeoutQueue = new ConcurrentLinkedQueue<>();
 | 
			
		||||
 | 
			
		||||
    private ExecutorService timeoutExecutor;
 | 
			
		||||
 | 
			
		||||
    public BaseRuleChainTransactionService(DbCallbackExecutorService callbackExecutor) {
 | 
			
		||||
        this.callbackExecutor = callbackExecutor;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    public void init() {
 | 
			
		||||
        timeoutExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("rule-chain-transaction"));
 | 
			
		||||
        executeOnTimeout();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
        if (timeoutExecutor != null) {
 | 
			
		||||
            timeoutExecutor.shutdownNow();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void beginTransaction(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure) {
 | 
			
		||||
        transactionLock.lock();
 | 
			
		||||
        try {
 | 
			
		||||
            BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id ->
 | 
			
		||||
                    new LinkedBlockingQueue<>(finalQueueSize));
 | 
			
		||||
 | 
			
		||||
            TbTransactionTask transactionTask = new TbTransactionTask(msg, onStart, onEnd, onFailure, System.currentTimeMillis() + duration);
 | 
			
		||||
            int queueSize = queue.size();
 | 
			
		||||
            if (queueSize >= finalQueueSize) {
 | 
			
		||||
                log.trace("Queue has no space: {}", transactionTask);
 | 
			
		||||
                executeOnFailure(transactionTask.getOnFailure(), "Queue has no space!");
 | 
			
		||||
            } else {
 | 
			
		||||
                addMsgToQueues(queue, transactionTask);
 | 
			
		||||
                if (queueSize == 0) {
 | 
			
		||||
                    executeOnSuccess(transactionTask.getOnStart(), transactionTask.getMsg());
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.trace("Msg [{}][{}] is waiting to start transaction!", msg.getId(), msg.getType());
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        } finally {
 | 
			
		||||
            transactionLock.unlock();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void endTransaction(TbMsg msg, Consumer<TbMsg> onSuccess, Consumer<Throwable> onFailure) {
 | 
			
		||||
        //TODO 2.5
 | 
			
		||||
//        Optional<ServerAddress> address = routingService.resolveById(msg.getTransactionData().getOriginatorId());
 | 
			
		||||
//        if (address.isPresent()) {
 | 
			
		||||
//            sendTransactionEventToRemoteServer(msg, address.get());
 | 
			
		||||
//            executeOnSuccess(onSuccess, msg);
 | 
			
		||||
//        } else {
 | 
			
		||||
            endLocalTransaction(msg, onSuccess, onFailure);
 | 
			
		||||
//        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void addMsgToQueues(BlockingQueue<TbTransactionTask> queue, TbTransactionTask transactionTask) {
 | 
			
		||||
        queue.offer(transactionTask);
 | 
			
		||||
        timeoutQueue.offer(transactionTask);
 | 
			
		||||
        log.trace("Added msg to queue, size: [{}]", queue.size());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void endLocalTransaction(TbMsg msg, Consumer<TbMsg> onSuccess, Consumer<Throwable> onFailure) {
 | 
			
		||||
        transactionLock.lock();
 | 
			
		||||
        try {
 | 
			
		||||
            BlockingQueue<TbTransactionTask> queue = transactionMap.computeIfAbsent(msg.getTransactionData().getOriginatorId(), id ->
 | 
			
		||||
                    new LinkedBlockingQueue<>(finalQueueSize));
 | 
			
		||||
 | 
			
		||||
            TbTransactionTask currentTransactionTask = queue.peek();
 | 
			
		||||
            if (currentTransactionTask != null) {
 | 
			
		||||
                if (currentTransactionTask.getMsg().getTransactionData().getTransactionId().equals(msg.getTransactionData().getTransactionId())) {
 | 
			
		||||
                    currentTransactionTask.setCompleted(true);
 | 
			
		||||
                    queue.poll();
 | 
			
		||||
                    log.trace("Removed msg from queue, size [{}]", queue.size());
 | 
			
		||||
 | 
			
		||||
                    executeOnSuccess(currentTransactionTask.getOnEnd(), currentTransactionTask.getMsg());
 | 
			
		||||
                    executeOnSuccess(onSuccess, msg);
 | 
			
		||||
 | 
			
		||||
                    TbTransactionTask nextTransactionTask = queue.peek();
 | 
			
		||||
                    if (nextTransactionTask != null) {
 | 
			
		||||
                        executeOnSuccess(nextTransactionTask.getOnStart(), nextTransactionTask.getMsg());
 | 
			
		||||
                    }
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.trace("Task has expired!");
 | 
			
		||||
                    executeOnFailure(onFailure, "Task has expired!");
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                log.trace("Queue is empty, previous task has expired!");
 | 
			
		||||
                executeOnFailure(onFailure, "Queue is empty, previous task has expired!");
 | 
			
		||||
            }
 | 
			
		||||
        } finally {
 | 
			
		||||
            transactionLock.unlock();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void executeOnTimeout() {
 | 
			
		||||
        timeoutExecutor.submit(() -> {
 | 
			
		||||
            while (true) {
 | 
			
		||||
                TbTransactionTask transactionTask = timeoutQueue.peek();
 | 
			
		||||
                if (transactionTask != null) {
 | 
			
		||||
                    long sleepDuration = 0L;
 | 
			
		||||
                    transactionLock.lock();
 | 
			
		||||
                    try {
 | 
			
		||||
                        if (transactionTask.isCompleted()) {
 | 
			
		||||
                            timeoutQueue.poll();
 | 
			
		||||
                        } else {
 | 
			
		||||
                            long expIn = transactionTask.getExpirationTime() - System.currentTimeMillis();
 | 
			
		||||
                            if (expIn < 0) {
 | 
			
		||||
                                log.trace("Task has expired! Deleting it...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
 | 
			
		||||
                                timeoutQueue.poll();
 | 
			
		||||
                                executeOnFailure(transactionTask.getOnFailure(), "Task has expired!");
 | 
			
		||||
 | 
			
		||||
                                BlockingQueue<TbTransactionTask> queue = transactionMap.get(transactionTask.getMsg().getTransactionData().getOriginatorId());
 | 
			
		||||
                                if (queue != null) {
 | 
			
		||||
                                    queue.poll();
 | 
			
		||||
                                    TbTransactionTask nextTransactionTask = queue.peek();
 | 
			
		||||
                                    if (nextTransactionTask != null) {
 | 
			
		||||
                                        executeOnSuccess(nextTransactionTask.getOnStart(), nextTransactionTask.getMsg());
 | 
			
		||||
                                    }
 | 
			
		||||
                                }
 | 
			
		||||
                            } else {
 | 
			
		||||
                                sleepDuration = Math.min(expIn, duration);
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                    } finally {
 | 
			
		||||
                        transactionLock.unlock();
 | 
			
		||||
                    }
 | 
			
		||||
                    if (sleepDuration > 0L) {
 | 
			
		||||
                        try {
 | 
			
		||||
                            log.trace("Task has not expired! Continue executing...[{}][{}]", transactionTask.getMsg().getId(), transactionTask.getMsg().getType());
 | 
			
		||||
                            TimeUnit.MILLISECONDS.sleep(sleepDuration);
 | 
			
		||||
                        } catch (InterruptedException e) {
 | 
			
		||||
                            throw new IllegalStateException("Thread interrupted", e);
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                } else {
 | 
			
		||||
                    try {
 | 
			
		||||
                        log.trace("Queue is empty, waiting for tasks!");
 | 
			
		||||
                        TimeUnit.SECONDS.sleep(1);
 | 
			
		||||
                    } catch (InterruptedException e) {
 | 
			
		||||
                        throw new IllegalStateException("Thread interrupted", e);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void executeOnFailure(Consumer<Throwable> onFailure, String exception) {
 | 
			
		||||
        executeCallback(() -> {
 | 
			
		||||
            onFailure.accept(new RuntimeException(exception));
 | 
			
		||||
            return null;
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void executeOnSuccess(Consumer<TbMsg> onSuccess, TbMsg tbMsg) {
 | 
			
		||||
        executeCallback(() -> {
 | 
			
		||||
            onSuccess.accept(tbMsg);
 | 
			
		||||
            return null;
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void executeCallback(Callable<Void> task) {
 | 
			
		||||
        callbackExecutor.executeAsync(task);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -1,44 +0,0 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.transaction;
 | 
			
		||||
 | 
			
		||||
import lombok.AllArgsConstructor;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
import java.util.function.Consumer;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
@AllArgsConstructor
 | 
			
		||||
public final class TbTransactionTask {
 | 
			
		||||
 | 
			
		||||
    private final TbMsg msg;
 | 
			
		||||
    private final Consumer<TbMsg> onStart;
 | 
			
		||||
    private final Consumer<TbMsg> onEnd;
 | 
			
		||||
    private final Consumer<Throwable> onFailure;
 | 
			
		||||
    private final long expirationTime;
 | 
			
		||||
 | 
			
		||||
    private boolean isCompleted;
 | 
			
		||||
 | 
			
		||||
    public TbTransactionTask(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure, long expirationTime) {
 | 
			
		||||
        this.msg = msg;
 | 
			
		||||
        this.onStart = onStart;
 | 
			
		||||
        this.onEnd = onEnd;
 | 
			
		||||
        this.onFailure = onFailure;
 | 
			
		||||
        this.expirationTime = expirationTime;
 | 
			
		||||
        this.isCompleted = false;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -44,7 +44,6 @@ public final class TbMsg implements Serializable {
 | 
			
		||||
    private final TbMsgMetaData metaData;
 | 
			
		||||
    private final TbMsgDataType dataType;
 | 
			
		||||
    private final String data;
 | 
			
		||||
    private final TbMsgTransactionData transactionData;
 | 
			
		||||
    private final RuleChainId ruleChainId;
 | 
			
		||||
    private final RuleNodeId ruleNodeId;
 | 
			
		||||
    //This field is not serialized because we use queues and there is no need to do it
 | 
			
		||||
@ -68,27 +67,22 @@ public final class TbMsg implements Serializable {
 | 
			
		||||
 | 
			
		||||
    public static TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
 | 
			
		||||
        return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), origMsg.getDataType(),
 | 
			
		||||
                data, origMsg.getTransactionData(), origMsg.getRuleChainId(), origMsg.getRuleNodeId(), origMsg.getCallback());
 | 
			
		||||
                data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), origMsg.getCallback());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
 | 
			
		||||
        return new TbMsg(UUID.randomUUID(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
 | 
			
		||||
        return new TbMsg(UUID.randomUUID(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(),
 | 
			
		||||
                tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
 | 
			
		||||
                  RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgCallback callback) {
 | 
			
		||||
        this(id, type, originator, metaData, dataType, data, new TbMsgTransactionData(id, originator), ruleChainId, ruleNodeId, callback);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
 | 
			
		||||
                  TbMsgTransactionData transactionData, RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgCallback callback) {
 | 
			
		||||
        this.id = id;
 | 
			
		||||
        this.type = type;
 | 
			
		||||
        this.originator = originator;
 | 
			
		||||
        this.metaData = metaData;
 | 
			
		||||
        this.dataType = dataType;
 | 
			
		||||
        this.data = data;
 | 
			
		||||
        this.transactionData = transactionData;
 | 
			
		||||
        this.ruleChainId = ruleChainId;
 | 
			
		||||
        this.ruleNodeId = ruleNodeId;
 | 
			
		||||
        if (callback != null) {
 | 
			
		||||
@ -125,15 +119,6 @@ public final class TbMsg implements Serializable {
 | 
			
		||||
            builder.setMetaData(MsgProtos.TbMsgMetaDataProto.newBuilder().putAllData(msg.getMetaData().getData()).build());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        TbMsgTransactionData transactionData = msg.getTransactionData();
 | 
			
		||||
        if (transactionData != null) {
 | 
			
		||||
            MsgProtos.TbMsgTransactionDataProto.Builder transactionBuilder = MsgProtos.TbMsgTransactionDataProto.newBuilder();
 | 
			
		||||
            transactionBuilder.setId(transactionData.getTransactionId().toString());
 | 
			
		||||
            transactionBuilder.setEntityType(transactionData.getOriginatorId().getEntityType().name());
 | 
			
		||||
            transactionBuilder.setEntityIdMSB(transactionData.getOriginatorId().getId().getMostSignificantBits());
 | 
			
		||||
            transactionBuilder.setEntityIdLSB(transactionData.getOriginatorId().getId().getLeastSignificantBits());
 | 
			
		||||
            builder.setTransactionData(transactionBuilder.build());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        builder.setDataType(msg.getDataType().ordinal());
 | 
			
		||||
        builder.setData(msg.getData());
 | 
			
		||||
@ -144,9 +129,6 @@ public final class TbMsg implements Serializable {
 | 
			
		||||
        try {
 | 
			
		||||
            MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(data);
 | 
			
		||||
            TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
 | 
			
		||||
            EntityId transactionEntityId = EntityIdFactory.getByTypeAndUuid(proto.getTransactionData().getEntityType(),
 | 
			
		||||
                    new UUID(proto.getTransactionData().getEntityIdMSB(), proto.getTransactionData().getEntityIdLSB()));
 | 
			
		||||
            TbMsgTransactionData transactionData = new TbMsgTransactionData(UUID.fromString(proto.getTransactionData().getId()), transactionEntityId);
 | 
			
		||||
            EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
 | 
			
		||||
            RuleChainId ruleChainId = null;
 | 
			
		||||
            RuleNodeId ruleNodeId = null;
 | 
			
		||||
@ -157,17 +139,17 @@ public final class TbMsg implements Serializable {
 | 
			
		||||
                ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB()));
 | 
			
		||||
            }
 | 
			
		||||
            TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
 | 
			
		||||
            return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), transactionData, ruleChainId, ruleNodeId, callback);
 | 
			
		||||
            return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, callback);
 | 
			
		||||
        } catch (InvalidProtocolBufferException e) {
 | 
			
		||||
            throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public TbMsg copyWithRuleChainId(RuleChainId ruleChainId) {
 | 
			
		||||
        return new TbMsg(this.id, this.type, this.originator, this.metaData, this.dataType, this.data, this.transactionData, ruleChainId, null, callback);
 | 
			
		||||
        return new TbMsg(this.id, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, null, callback);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
 | 
			
		||||
        return new TbMsg(this.id, this.type, this.originator, this.metaData, this.dataType, this.data, this.transactionData, ruleChainId, ruleNodeId, callback);
 | 
			
		||||
        return new TbMsg(this.id, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, callback);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,30 +0,0 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.msg;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
 | 
			
		||||
import java.io.Serializable;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
public final class TbMsgTransactionData implements Serializable {
 | 
			
		||||
 | 
			
		||||
    private final UUID transactionId;
 | 
			
		||||
    private final EntityId originatorId;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -23,13 +23,6 @@ message TbMsgMetaDataProto {
 | 
			
		||||
    map<string, string> data = 1;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message TbMsgTransactionDataProto {
 | 
			
		||||
    string id = 1;
 | 
			
		||||
    string entityType = 2;
 | 
			
		||||
    int64 entityIdMSB = 3;
 | 
			
		||||
    int64 entityIdLSB = 4;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message TbMsgProto {
 | 
			
		||||
    string id = 1;
 | 
			
		||||
    string type = 2;
 | 
			
		||||
@ -46,7 +39,7 @@ message TbMsgProto {
 | 
			
		||||
 | 
			
		||||
    TbMsgMetaDataProto metaData = 11;
 | 
			
		||||
 | 
			
		||||
    TbMsgTransactionDataProto transactionData = 12;
 | 
			
		||||
    //Transaction Data (12) was removed in 2.5
 | 
			
		||||
 | 
			
		||||
    int32 dataType = 13;
 | 
			
		||||
    string data = 14;
 | 
			
		||||
 | 
			
		||||
@ -1,28 +0,0 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.rule.engine.api;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
import java.util.function.Consumer;
 | 
			
		||||
 | 
			
		||||
public interface RuleChainTransactionService {
 | 
			
		||||
 | 
			
		||||
    void beginTransaction(TbMsg msg, Consumer<TbMsg> onStart, Consumer<TbMsg> onEnd, Consumer<Throwable> onFailure);
 | 
			
		||||
 | 
			
		||||
    void endTransaction(TbMsg msg, Consumer<TbMsg> onSuccess, Consumer<Throwable> onFailure);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -203,8 +203,6 @@ public interface TbContext {
 | 
			
		||||
 | 
			
		||||
    String getServiceId();
 | 
			
		||||
 | 
			
		||||
    RuleChainTransactionService getRuleChainTransactionService();
 | 
			
		||||
 | 
			
		||||
    EventLoopGroup getSharedEventLoop();
 | 
			
		||||
 | 
			
		||||
    CassandraCluster getCassandraCluster();
 | 
			
		||||
 | 
			
		||||
@ -25,10 +25,6 @@ import org.thingsboard.rule.engine.api.TbNodeException;
 | 
			
		||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgDataType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgTransactionData;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
 | 
			
		||||
 | 
			
		||||
@ -43,31 +39,17 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
 | 
			
		||||
                "Size of the queue per originator and timeout values are configurable on a system level",
 | 
			
		||||
        uiResources = {"static/rulenode/rulenode-core-config.js"},
 | 
			
		||||
        configDirective = "tbNodeEmptyConfig")
 | 
			
		||||
@Deprecated
 | 
			
		||||
public class TbSynchronizationBeginNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    private EmptyNodeConfiguration config;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        log.trace("Msg enters transaction - [{}][{}]", msg.getId(), msg.getType());
 | 
			
		||||
 | 
			
		||||
        TbMsgTransactionData transactionData = new TbMsgTransactionData(msg.getId(), msg.getOriginator());
 | 
			
		||||
        //TODO 2.5: Callback?
 | 
			
		||||
        TbMsg tbMsg = TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData());
 | 
			
		||||
 | 
			
		||||
        ctx.getRuleChainTransactionService().beginTransaction(tbMsg, startMsg -> {
 | 
			
		||||
                    log.trace("Transaction starting...[{}][{}]", startMsg.getId(), startMsg.getType());
 | 
			
		||||
                    ctx.tellNext(startMsg, SUCCESS);
 | 
			
		||||
                }, endMsg -> log.trace("Transaction ended successfully...[{}][{}]", endMsg.getId(), endMsg.getType()),
 | 
			
		||||
                throwable -> {
 | 
			
		||||
                    log.trace("Transaction failed! [{}][{}]", tbMsg.getId(), tbMsg.getType(), throwable);
 | 
			
		||||
                    ctx.tellFailure(tbMsg, throwable);
 | 
			
		||||
                });
 | 
			
		||||
        log.warn("Synchronization Start/End nodes are deprecated since TB 2.5. Use queue with submit strategy SEQUENTIAL_WITHIN_ORIGINATOR instead.");
 | 
			
		||||
        ctx.tellSuccess(msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -40,25 +40,20 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
 | 
			
		||||
        uiResources = {"static/rulenode/rulenode-core-config.js"},
 | 
			
		||||
        configDirective = ("tbNodeEmptyConfig")
 | 
			
		||||
)
 | 
			
		||||
@Deprecated
 | 
			
		||||
public class TbSynchronizationEndNode implements TbNode {
 | 
			
		||||
 | 
			
		||||
    private EmptyNodeConfiguration config;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onMsg(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        ctx.getRuleChainTransactionService().endTransaction(msg,
 | 
			
		||||
                successMsg -> ctx.tellNext(successMsg, SUCCESS),
 | 
			
		||||
                throwable -> ctx.tellFailure(msg, throwable));
 | 
			
		||||
        log.trace("Msg left transaction - [{}][{}]", msg.getId(), msg.getType());
 | 
			
		||||
        log.warn("Synchronization Start/End nodes are deprecated since TB 2.5. Use queue with submit strategy SEQUENTIAL_WITHIN_ORIGINATOR instead.");
 | 
			
		||||
        ctx.tellSuccess(msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void destroy() {
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user