minor refactoring due to comments
This commit is contained in:
parent
726c95c661
commit
b0a5a97bab
@ -110,7 +110,6 @@ import org.thingsboard.server.dao.widget.WidgetTypeService;
|
||||
import org.thingsboard.server.dao.widget.WidgetsBundleService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.TbQueueCallback;
|
||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
|
||||
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
|
||||
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
|
||||
import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
|
||||
@ -177,24 +176,10 @@ class DefaultTbContext implements TbContext {
|
||||
if (!msg.isValid()) {
|
||||
return;
|
||||
}
|
||||
TbMsg inputMsg = msg.copyWithRuleChainId(ruleChainId);
|
||||
inputMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
|
||||
TransportProtos.ToRuleEngineMsg toReMsg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
|
||||
.setTbMsg(TbMsg.toByteString(inputMsg)).build();
|
||||
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), inputMsg.getOriginator());
|
||||
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, inputMsg.getId(), toReMsg, new TbQueueCallback() {
|
||||
@Override
|
||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
||||
ack(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable error) {
|
||||
tellFailure(msg, error);
|
||||
}
|
||||
});
|
||||
TbMsg tbMsg = msg.copyWithRuleChainId(ruleChainId);
|
||||
tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
|
||||
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator());
|
||||
doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t)));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -230,14 +215,10 @@ class DefaultTbContext implements TbContext {
|
||||
}
|
||||
return;
|
||||
}
|
||||
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
|
||||
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
|
||||
if (nodeCtx.getSelf().isDebugMode()) {
|
||||
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain");
|
||||
}
|
||||
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(
|
||||
doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(
|
||||
metadata -> {
|
||||
if (onSuccess != null) {
|
||||
onSuccess.run();
|
||||
@ -252,6 +233,14 @@ class DefaultTbContext implements TbContext {
|
||||
}));
|
||||
}
|
||||
|
||||
private void doEnqueue(TopicPartitionInfo tpi, TbMsg tbMsg, TbQueueCallback callback) {
|
||||
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
|
||||
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
|
||||
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) {
|
||||
TopicPartitionInfo tpi = resolvePartition(tbMsg);
|
||||
|
||||
@ -333,7 +333,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
|
||||
RuleChain finalRuleChain = rootRuleChain;
|
||||
RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
|
||||
|
||||
Awaitility.await().atMost(3, TimeUnit.SECONDS)
|
||||
Awaitility.await().atMost(TIMEOUT, TimeUnit.SECONDS)
|
||||
.until(() ->
|
||||
getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000)
|
||||
.getData()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user