diff --git a/msa/js-executor/api/utils.ts b/msa/js-executor/api/utils.ts index b43a5f05f2..361025f806 100644 --- a/msa/js-executor/api/utils.ts +++ b/msa/js-executor/api/utils.ts @@ -39,12 +39,6 @@ export function isString(value: any): boolean { return typeof value === 'string'; } -export function sleep(ms: number): Promise { - return new Promise((resolve) => { - setTimeout(resolve, ms); - }); -} - export function parseJsErrorDetails(err: any): string | undefined { if (!err) { return undefined; diff --git a/msa/js-executor/queue/awsSqsTemplate.ts b/msa/js-executor/queue/awsSqsTemplate.ts index 36ec9b5270..259d285cf2 100644 --- a/msa/js-executor/queue/awsSqsTemplate.ts +++ b/msa/js-executor/queue/awsSqsTemplate.ts @@ -34,7 +34,6 @@ import { SQSClient } from '@aws-sdk/client-sqs'; import uuid from 'uuid-random'; -import { sleep } from '../api/utils'; export class AwsSqsTemplate implements IQueue { @@ -48,11 +47,11 @@ export class AwsSqsTemplate implements IQueue { private sqsClient: SQSClient; private requestQueueURL: string - private stopped = false; private queueUrls = new Map(); private queueAttributes: { [n: string]: string } = { FifoQueue: 'true' }; + private timer: NodeJS.Timer; name = 'AWS SQS'; @@ -91,40 +90,37 @@ export class AwsSqsTemplate implements IQueue { const params: ReceiveMessageRequest = { MaxNumberOfMessages: 10, QueueUrl: this.requestQueueURL, - WaitTimeSeconds: this.pollInterval / 1000 + WaitTimeSeconds: Math.ceil(this.pollInterval / 10) }; - while (!this.stopped) { - let pollStartTs = new Date().getTime(); - const messagesResponse: ReceiveMessageResult = await this.sqsClient.send(new ReceiveMessageCommand(params)); - const messages = messagesResponse.Messages; + this.timer = setTimeout(() => {this.getAndProcessMessage(messageProcessor, params)}, this.pollInterval); + } - if (messages && messages.length > 0) { - const entries: DeleteMessageBatchRequestEntry[] = []; + private async getAndProcessMessage(messageProcessor: JsInvokeMessageProcessor, params: ReceiveMessageRequest) { + const messagesResponse: ReceiveMessageResult = await this.sqsClient.send(new ReceiveMessageCommand(params)); + const messages = messagesResponse.Messages; - messages.forEach(message => { - entries.push({ - Id: message.MessageId, - ReceiptHandle: message.ReceiptHandle - }); - messageProcessor.onJsInvokeMessage(JSON.parse(message.Body || '')); + if (messages && messages.length > 0) { + const entries: DeleteMessageBatchRequestEntry[] = []; + + messages.forEach(message => { + entries.push({ + Id: message.MessageId, + ReceiptHandle: message.ReceiptHandle }); + messageProcessor.onJsInvokeMessage(JSON.parse(message.Body || '')); + }); - const deleteBatch: DeleteMessageBatchRequest = { - QueueUrl: this.requestQueueURL, - Entries: entries - }; - try { - await this.sqsClient.send(new DeleteMessageBatchCommand(deleteBatch)) - } catch (err: any) { - this.logger.error("Failed to delete messages from queue.", err.message); - } - } else { - let pollDuration = new Date().getTime() - pollStartTs; - if (pollDuration < this.pollInterval) { - await sleep(this.pollInterval - pollDuration); - } + const deleteBatch: DeleteMessageBatchRequest = { + QueueUrl: this.requestQueueURL, + Entries: entries + }; + try { + await this.sqsClient.send(new DeleteMessageBatchCommand(deleteBatch)) + } catch (err: any) { + this.logger.error("Failed to delete messages from queue.", err.message); } } + this.timer = setTimeout(() => {this.getAndProcessMessage(messageProcessor, params)}, this.pollInterval); } async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise { @@ -182,8 +178,8 @@ export class AwsSqsTemplate implements IQueue { } async destroy(): Promise { - this.stopped = true; this.logger.info('Stopping AWS SQS resources...'); + clearTimeout(this.timer); if (this.sqsClient) { this.logger.info('Stopping AWS SQS client...'); try {