Merge pull request #6906 from vvlladd28/bug/js-executors/aws-sqs-queue

[3.4] Fix incorrect start JavaScript Executor AWS SQS queue
This commit is contained in:
Andrew Shvayka 2022-07-08 17:12:35 +03:00 committed by GitHub
commit 36de871027
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 36 deletions

View File

@ -39,12 +39,6 @@ export function isString(value: any): boolean {
return typeof value === 'string'; return typeof value === 'string';
} }
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
export function parseJsErrorDetails(err: any): string | undefined { export function parseJsErrorDetails(err: any): string | undefined {
if (!err) { if (!err) {
return undefined; return undefined;

View File

@ -34,7 +34,6 @@ import {
SQSClient SQSClient
} from '@aws-sdk/client-sqs'; } from '@aws-sdk/client-sqs';
import uuid from 'uuid-random'; import uuid from 'uuid-random';
import { sleep } from '../api/utils';
export class AwsSqsTemplate implements IQueue { export class AwsSqsTemplate implements IQueue {
@ -48,11 +47,11 @@ export class AwsSqsTemplate implements IQueue {
private sqsClient: SQSClient; private sqsClient: SQSClient;
private requestQueueURL: string private requestQueueURL: string
private stopped = false;
private queueUrls = new Map<string, string>(); private queueUrls = new Map<string, string>();
private queueAttributes: { [n: string]: string } = { private queueAttributes: { [n: string]: string } = {
FifoQueue: 'true' FifoQueue: 'true'
}; };
private timer: NodeJS.Timer;
name = 'AWS SQS'; name = 'AWS SQS';
@ -91,40 +90,37 @@ export class AwsSqsTemplate implements IQueue {
const params: ReceiveMessageRequest = { const params: ReceiveMessageRequest = {
MaxNumberOfMessages: 10, MaxNumberOfMessages: 10,
QueueUrl: this.requestQueueURL, QueueUrl: this.requestQueueURL,
WaitTimeSeconds: this.pollInterval / 1000 WaitTimeSeconds: Math.ceil(this.pollInterval / 10)
}; };
while (!this.stopped) { this.timer = setTimeout(() => {this.getAndProcessMessage(messageProcessor, params)}, this.pollInterval);
let pollStartTs = new Date().getTime(); }
const messagesResponse: ReceiveMessageResult = await this.sqsClient.send(new ReceiveMessageCommand(params));
const messages = messagesResponse.Messages;
if (messages && messages.length > 0) { private async getAndProcessMessage(messageProcessor: JsInvokeMessageProcessor, params: ReceiveMessageRequest) {
const entries: DeleteMessageBatchRequestEntry[] = []; const messagesResponse: ReceiveMessageResult = await this.sqsClient.send(new ReceiveMessageCommand(params));
const messages = messagesResponse.Messages;
messages.forEach(message => { if (messages && messages.length > 0) {
entries.push({ const entries: DeleteMessageBatchRequestEntry[] = [];
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle messages.forEach(message => {
}); entries.push({
messageProcessor.onJsInvokeMessage(JSON.parse(message.Body || '')); Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle
}); });
messageProcessor.onJsInvokeMessage(JSON.parse(message.Body || ''));
});
const deleteBatch: DeleteMessageBatchRequest = { const deleteBatch: DeleteMessageBatchRequest = {
QueueUrl: this.requestQueueURL, QueueUrl: this.requestQueueURL,
Entries: entries Entries: entries
}; };
try { try {
await this.sqsClient.send(new DeleteMessageBatchCommand(deleteBatch)) await this.sqsClient.send(new DeleteMessageBatchCommand(deleteBatch))
} catch (err: any) { } catch (err: any) {
this.logger.error("Failed to delete messages from queue.", err.message); this.logger.error("Failed to delete messages from queue.", err.message);
}
} else {
let pollDuration = new Date().getTime() - pollStartTs;
if (pollDuration < this.pollInterval) {
await sleep(this.pollInterval - pollDuration);
}
} }
} }
this.timer = setTimeout(() => {this.getAndProcessMessage(messageProcessor, params)}, this.pollInterval);
} }
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> { async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
@ -182,8 +178,8 @@ export class AwsSqsTemplate implements IQueue {
} }
async destroy(): Promise<void> { async destroy(): Promise<void> {
this.stopped = true;
this.logger.info('Stopping AWS SQS resources...'); this.logger.info('Stopping AWS SQS resources...');
clearTimeout(this.timer);
if (this.sqsClient) { if (this.sqsClient) {
this.logger.info('Stopping AWS SQS client...'); this.logger.info('Stopping AWS SQS client...');
try { try {