diff --git a/msa/js-executor/queue/rabbitmqTemplate.ts b/msa/js-executor/queue/rabbitmqTemplate.ts index 9369f170f5..ccd3cef54b 100644 --- a/msa/js-executor/queue/rabbitmqTemplate.ts +++ b/msa/js-executor/queue/rabbitmqTemplate.ts @@ -20,7 +20,6 @@ import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor' import { IQueue } from './queue.models'; import amqp, { ConfirmChannel, Connection } from 'amqplib'; import { Options, Replies } from 'amqplib/properties'; -import { sleep } from '../api/utils'; export class RabbitMqTemplate implements IQueue { @@ -32,7 +31,6 @@ export class RabbitMqTemplate implements IQueue { private username = config.get('rabbitmq.username'); private password = config.get('rabbitmq.password'); private queueProperties: string = config.get('rabbitmq.queue_properties'); - private pollInterval = Number(config.get('js.response_poll_interval')); private queueOptions: Options.AssertQueue = { durable: false, @@ -41,7 +39,6 @@ export class RabbitMqTemplate implements IQueue { }; private connection: Connection; private channel: ConfirmChannel; - private stopped = false; private topics: string[] = []; name = 'RabbitMQ'; @@ -60,20 +57,12 @@ export class RabbitMqTemplate implements IQueue { const messageProcessor = new JsInvokeMessageProcessor(this); - while (!this.stopped) { - let pollStartTs = new Date().getTime(); - let message = await this.channel.get(this.requestTopic); - + await this.channel.consume(this.requestTopic, (message) => { if (message) { messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); this.channel.ack(message); - } else { - let pollDuration = new Date().getTime() - pollStartTs; - if (pollDuration < this.pollInterval) { - await sleep(this.pollInterval - pollDuration); - } } - } + }) } async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise {