used enqueue for input node for avoid latency
This commit is contained in:
parent
7a29894f16
commit
d3d20aa449
@ -172,8 +172,18 @@ class DefaultTbContext implements TbContext {
|
||||
|
||||
@Override
|
||||
public void input(TbMsg msg, RuleChainId ruleChainId) {
|
||||
ack(msg);
|
||||
if (!msg.isValid()) {
|
||||
return;
|
||||
}
|
||||
msg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
|
||||
nodeCtx.getChainActor().tell(new RuleChainInputMsg(ruleChainId, msg));
|
||||
TbMsg inputMsg = msg.copyWithRuleChainId(ruleChainId);
|
||||
TransportProtos.ToRuleEngineMsg toReMsg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
|
||||
.setTbMsg(TbMsg.toByteString(inputMsg)).build();
|
||||
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), inputMsg.getOriginator());
|
||||
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, inputMsg.getId(), toReMsg, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user