minor refactoring, ack used if push is success

This commit is contained in:
YevhenBondarenko 2024-10-18 17:05:22 +02:00
parent d3d20aa449
commit e2a356703f

View File

@ -109,6 +109,8 @@ import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.common.SimpleTbQueueCallback; import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
@ -172,18 +174,27 @@ class DefaultTbContext implements TbContext {
@Override @Override
public void input(TbMsg msg, RuleChainId ruleChainId) { public void input(TbMsg msg, RuleChainId ruleChainId) {
ack(msg);
if (!msg.isValid()) { if (!msg.isValid()) {
return; return;
} }
msg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
TbMsg inputMsg = msg.copyWithRuleChainId(ruleChainId); TbMsg inputMsg = msg.copyWithRuleChainId(ruleChainId);
inputMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
TransportProtos.ToRuleEngineMsg toReMsg = TransportProtos.ToRuleEngineMsg.newBuilder() TransportProtos.ToRuleEngineMsg toReMsg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(inputMsg)).build(); .setTbMsg(TbMsg.toByteString(inputMsg)).build();
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), inputMsg.getOriginator()); TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), inputMsg.getOriginator());
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, inputMsg.getId(), toReMsg, null); mainCtx.getClusterService().pushMsgToRuleEngine(tpi, inputMsg.getId(), toReMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
ack(msg);
}
@Override
public void onFailure(Throwable error) {
tellFailure(msg, error);
}
});
} }
@Override @Override