Added ability to process tellNext messages from Queue

This commit is contained in:
Andrii Shvaika 2020-04-06 19:40:38 +03:00
parent c52d0d26d3
commit ff3fd89ace
3 changed files with 49 additions and 35 deletions

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleNode;
@ -175,6 +176,9 @@ class DefaultTbContext implements TbContext {
}
private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg tbMsg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId();
RuleNodeId ruleNodeId = nodeCtx.getSelf().getId();
tbMsg = TbMsg.newMsg(tbMsg, ruleChainId, ruleNodeId);
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -34,18 +34,18 @@ import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.queue.common.MultipleTbQueueTbMsgCallbackWrapper;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbQueueTbMsgCallbackWrapper;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.MultipleTbQueueTbMsgCallbackWrapper;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.TbQueueTbMsgCallbackWrapper;
import java.util.ArrayList;
import java.util.Collections;
@ -194,25 +194,29 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {
TbMsg msg = envelope.getTbMsg();
log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg);
try {
checkActive();
RuleNodeId targetId = msg.getRuleNodeId();
RuleNodeCtx targetCtx;
if (targetId == null) {
targetCtx = firstNode;
msg = msg.copyWithRuleChainId(entityId);
} else {
targetCtx = nodeActors.get(targetId);
if (envelope.getRelationTypes() == null) {
try {
checkActive();
RuleNodeId targetId = msg.getRuleNodeId();
RuleNodeCtx targetCtx;
if (targetId == null) {
targetCtx = firstNode;
msg = msg.copyWithRuleChainId(entityId);
} else {
targetCtx = nodeActors.get(targetId);
}
if (targetCtx != null) {
log.trace("[{}][{}] Pushing message to target rule node", entityId, targetId);
pushMsgToNode(targetCtx, msg, "");
} else {
log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId);
msg.getCallback().onSuccess();
}
} catch (Exception e) {
envelope.getTbMsg().getCallback().onFailure(e);
}
if (targetCtx != null) {
log.trace("[{}][{}] Pushing message to target rule node", entityId, targetId);
pushMsgToNode(firstNode, msg, "");
} else {
log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId);
msg.getCallback().onSuccess();
}
} catch (Exception e) {
envelope.getTbMsg().getCallback().onFailure(e);
} else {
onTellNext(envelope.getTbMsg(), envelope.getTbMsg().getRuleNodeId(), envelope.getRelationTypes());
}
}
@ -226,22 +230,24 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
}
void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
TbMsg msg = envelope.getMsg();
onTellNext(envelope.getMsg(), envelope.getOriginator(), envelope.getRelationTypes());
}
private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes) {
try {
checkActive();
EntityId entityId = msg.getOriginator();
TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
RuleNodeId originatorNodeId = envelope.getOriginator();
List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
.filter(r -> contains(envelope.getRelationTypes(), r.getType()))
.filter(r -> contains(relationTypes, r.getType()))
.collect(Collectors.toList());
int relationsCount = relations.size();
if (relationsCount == 0) {
log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
if (envelope.getRelationTypes().contains(TbRelationTypes.FAILURE)) {
log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, envelope.getOriginator().getId());
if (relationTypes.contains(TbRelationTypes.FAILURE)) {
log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());
//TODO 2.5: Introduce our own RuleEngineFailureException to track what is wrong
msg.getCallback().onFailure(new RuntimeException("Failure during message processing by Rule Node [" + envelope.getOriginator().getId().toString() + "]"));
msg.getCallback().onFailure(new RuntimeException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));
} else {
msg.getCallback().onSuccess();
}

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -73,6 +73,10 @@ public final class TbMsg implements Serializable {
data, origMsg.getTransactionData(), origMsg.getRuleChainId(), origMsg.getRuleNodeId(), origMsg.getCallback());
}
public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
return new TbMsg(UUID.randomUUID(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
}
private TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgCallback callback) {
this(id, type, originator, metaData, dataType, data, new TbMsgTransactionData(id, originator), ruleChainId, ruleNodeId, callback);