Merge pull request #6904 from vvlladd28/bug/js-executor/rabbit-mq

[3.4] Fix incorrect shutdown JavaScript Executor RabbitMQ queue
This commit is contained in:
Andrew Shvayka 2022-07-08 12:08:29 +03:00 committed by GitHub
commit a5d1eb0454
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -20,7 +20,6 @@ import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor'
import { IQueue } from './queue.models'; import { IQueue } from './queue.models';
import amqp, { ConfirmChannel, Connection } from 'amqplib'; import amqp, { ConfirmChannel, Connection } from 'amqplib';
import { Options, Replies } from 'amqplib/properties'; import { Options, Replies } from 'amqplib/properties';
import { sleep } from '../api/utils';
export class RabbitMqTemplate implements IQueue { export class RabbitMqTemplate implements IQueue {
@ -32,7 +31,6 @@ export class RabbitMqTemplate implements IQueue {
private username = config.get('rabbitmq.username'); private username = config.get('rabbitmq.username');
private password = config.get('rabbitmq.password'); private password = config.get('rabbitmq.password');
private queueProperties: string = config.get('rabbitmq.queue_properties'); private queueProperties: string = config.get('rabbitmq.queue_properties');
private pollInterval = Number(config.get('js.response_poll_interval'));
private queueOptions: Options.AssertQueue = { private queueOptions: Options.AssertQueue = {
durable: false, durable: false,
@ -41,7 +39,6 @@ export class RabbitMqTemplate implements IQueue {
}; };
private connection: Connection; private connection: Connection;
private channel: ConfirmChannel; private channel: ConfirmChannel;
private stopped = false;
private topics: string[] = []; private topics: string[] = [];
name = 'RabbitMQ'; name = 'RabbitMQ';
@ -60,20 +57,12 @@ export class RabbitMqTemplate implements IQueue {
const messageProcessor = new JsInvokeMessageProcessor(this); const messageProcessor = new JsInvokeMessageProcessor(this);
while (!this.stopped) { await this.channel.consume(this.requestTopic, (message) => {
let pollStartTs = new Date().getTime();
let message = await this.channel.get(this.requestTopic);
if (message) { if (message) {
messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8')));
this.channel.ack(message); this.channel.ack(message);
} else {
let pollDuration = new Date().getTime() - pollStartTs;
if (pollDuration < this.pollInterval) {
await sleep(this.pollInterval - pollDuration);
}
} }
} })
} }
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> { async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {