带有 SQS 的 wkhtmltoimage,Bluebird 承诺,一次 15 个

wkhtmltoimage with SQS, Bluebird promise, 15 at a time

我有以下 nodejs 程序从 SQS 队列中检索大量网站 url 并使用 wkhtmltoimage 对它们进行截图,一次 15 个:

    var concurrency=15;
    //break down queue size into batches of concurrency size

    var getTotalNumberMessages = sqs.getQueueAttributesAsync({
        QueueUrl: queueUrl,
        AttributeNames: ['All']
    }).then(function(data) {

        var total = Array(Math.floor(parseInt(data.Attributes.ApproximateNumberOfMessages)/ concurrency));

        Promise.each(total, function (value, index, length) {

            var toDo = Array(Math.floor(concurrency / 10) + 1);
            messages = [];
            var gmPromises = [];
            Promise.each(toDo, function (value, index, length) {
                gmPromises.push(
                    sqs.receiveMessageAsync({
                        QueueUrl: queueUrl,
                        WaitTimeSeconds: 20,
                        VisibilityTimeout: 120, 
                        MaxNumberOfMessages: (concurrency < 10 ? concurrency : 10)
                    }).then(function (data) {
                        if (data.Messages.length == 0) {
                            done = true;
                        } else {
                            messages = messages.concat(data.Messages);
                        }
                    })
                );
            }).then(function() {
                Promise.all(gmPromises).then(function () {

                    var promises = [];

                    Promise.map(messages, function (message) {

                        var tmpFilename = '/tmp/' + md5(s3key) + '.png';

                        var process = spawn('/opt/wkhtmltox/bin/wkhtmltoimage', ['--width', '1300', '--height', '900', body.url, tmpFilename]);

                        console.log('Running wkhtmltoimage ' + body.url + ' ' + tmpFilename);
                        process.stdout.on('data', function (data) {
                            console.log(data.toString());
                        });

                        process.stderr.on('data', function (err) {
                            console.log(err.toString());
                        });

                        return new Promise(function (resolve, reject) {
                            process.on('exit', function (code) {
                                if (code == 0) {
                                    fs.readFile(tmpFilename, {}, function (err, data) {
                                            if (err) {
                                                throw err;
                                            } else {
                                                var fileData = Buffer.from(data, 'binary');

                                                var s3 = new AWS.S3();
                                                s3.putObject({
                                                    Bucket: 'mybucket',
                                                    Key: s3key,
                                                    Body: fileData,
                                                    ACL: 'public-read'
                                                }, function (err, resp) {
                                                    var deleteMessagePromise = sqs.deleteMessage({
                                                        QueueUrl: queueUrl,
                                                        ReceiptHandle: message.ReceiptHandle
                                                    }).promise();
                                                    deleteMessagePromise.catch(function (err) {
                                                        console.log('SQS deleteMessage failed: ', err, err.stack);
                                                    });
                                                    promises.push(deleteMessagePromise);

                                                    console.log(arguments);
                                                    console.log('Successfully uploaded package.');
                                                    resolve();
                                                });
                                            }
                                        }
                                    );
                                }
                            });
                        });
                    }, {
                        concurrency: 15
                    });


                    return Promise.all(promises);
                });
            });
        });

但是我发现有超过 15 个 wkhtmltoimage 运行 并行。从 SQS 中检索 15 条消息的批次似乎是并行的,即使我使用的是 Bluebird 的 Promise.each?

在我上面的代码中,我所做的是

  • 找出我的队列中有多少项目
  • 除以并发处理的项目数 -> 组数
  • 每组
    • 一次处理 15 个
    • 循环到下一组

因为所有相关代码都使用异步调用,所以我队列中的所有项目将被并行处理,这不是代码的意图。相反,我想限制并发性,所以我只使用 wkhtmltoimage

使用的 x 内存量

我需要的是从我的队列中接收 15 个项目,然后停止接收项目,直到我的一些 wkhtml 进程完成。然后我可以接收物品,直到我再次有 15 个 运行 处理。

经过大量研究后,我发现这就是 node.js streams do when in paused mode。我还找到了一个 node.js 包,它将 SQS 队列包装成一个流:sqs-readable-stream

这是我的最终代码:

    var sqsStream = new SQSReadableStream({
        sqsClient: sqs,
        queueUrl: queueUrl
    });

    var collect = [];

    sqsStream.on('data', function(message){

        collect.push(message);

        if (collect.length >= concurrency){
            sqsStream.pause();
        }

        var body = JSON.parse(message.Body);

        var s3key = id + '/' + befAft + '/' + body.url.replace(/http(s)?:\/\//, '') + '.png';

        var tmpFilename = '/tmp/' + md5(s3key) + '.png';

        var process = spawn('/opt/wkhtmltox/bin/wkhtmltoimage', ['--width', '1300', '--height', '900', body.url, tmpFilename]);

        console.log('Running wkhtmltoimage ' + body.url + ' ' + tmpFilename);
        process.stdout.on('data', function (data) {
            //console.log(data.toString());
        });

        process.stderr.on('data', function (err) {
            //console.log(err.toString());
        });

        process.on('exit', function (code) {
            if (code == 0) {
                fs.readFile(tmpFilename, {}, function (err, data) {
                        if (err) {
                            throw err;
                        } else {
                            var fileData = Buffer.from(data, 'binary');

                            var s3 = new AWS.S3();
                            s3.putObject({
                                Bucket: 'somebucket',
                                Key: s3key,
                                Body: fileData
                            }, function (err, resp) {
                                message.deleteMessage();
                                console.log(arguments);
                                console.log('Successfully uploaded package.');
                                collect.pop();
                                sqsStream.resume()
                            });
                        }
                    }
                );
            }
        });

    });