Merge pull request #7895 from adovh/feature/prod-1586_add_quename_to_generator_node

[3.5]add quename config to generator node
This commit is contained in:
Andrew Shvayka 2023-03-06 13:40:02 +02:00 committed by GitHub
commit 1be933622d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 5 additions and 4 deletions

View File

@ -137,7 +137,7 @@ public class TbMsgGeneratorNode implements TbNode {
} }
lastScheduledTs = lastScheduledTs + delay; lastScheduledTs = lastScheduledTs + delay;
long curDelay = Math.max(0L, (lastScheduledTs - curTs)); long curDelay = Math.max(0L, (lastScheduledTs - curTs));
TbMsg tickMsg = ctx.newMsg(null, TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), ""); TbMsg tickMsg = ctx.newMsg(config.getQueueName(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
nextTickId = tickMsg.getId(); nextTickId = tickMsg.getId();
ctx.tellSelf(tickMsg, curDelay); ctx.tellSelf(tickMsg, curDelay);
} }
@ -145,14 +145,14 @@ public class TbMsgGeneratorNode implements TbNode {
private ListenableFuture<TbMsg> generate(TbContext ctx, TbMsg msg) { private ListenableFuture<TbMsg> generate(TbContext ctx, TbMsg msg) {
log.trace("generate, config {}", config); log.trace("generate, config {}", config);
if (prevMsg == null) { if (prevMsg == null) {
prevMsg = ctx.newMsg(null, "", originatorId, msg.getCustomerId(), new TbMsgMetaData(), "{}"); prevMsg = ctx.newMsg(config.getQueueName(), "", originatorId, msg.getCustomerId(), new TbMsgMetaData(), "{}");
} }
if (initialized.get()) { if (initialized.get()) {
ctx.logJsEvalRequest(); ctx.logJsEvalRequest();
return Futures.transformAsync(scriptEngine.executeGenerateAsync(prevMsg), generated -> { return Futures.transformAsync(scriptEngine.executeGenerateAsync(prevMsg), generated -> {
log.trace("generate process response, generated {}, config {}", generated, config); log.trace("generate process response, generated {}, config {}", generated, config);
ctx.logJsEvalResponse(); ctx.logJsEvalResponse();
prevMsg = ctx.newMsg(null, generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData()); prevMsg = ctx.newMsg(config.getQueueName(), generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData());
return Futures.immediateFuture(prevMsg); return Futures.immediateFuture(prevMsg);
}, MoreExecutors.directExecutor()); //usually it runs on js-executor-remote-callback thread pool }, MoreExecutors.directExecutor()); //usually it runs on js-executor-remote-callback thread pool
} }

View File

@ -36,6 +36,7 @@ public class TbMsgGeneratorNodeConfiguration implements NodeConfiguration<TbMsgG
private ScriptLanguage scriptLang; private ScriptLanguage scriptLang;
private String jsScript; private String jsScript;
private String tbelScript; private String tbelScript;
private String queueName;
@Override @Override
public TbMsgGeneratorNodeConfiguration defaultConfiguration() { public TbMsgGeneratorNodeConfiguration defaultConfiguration() {