带有 SQS 的 wkhtmltoimage,Bluebird 承诺,一次 15 个
wkhtmltoimage with SQS, Bluebird promise, 15 at a time
我有以下 nodejs 程序从 SQS 队列中检索大量网站 url 并使用 wkhtmltoimage 对它们进行截图,一次 15 个:
var concurrency=15;
//break down queue size into batches of concurrency size
var getTotalNumberMessages = sqs.getQueueAttributesAsync({
QueueUrl: queueUrl,
AttributeNames: ['All']
}).then(function(data) {
var total = Array(Math.floor(parseInt(data.Attributes.ApproximateNumberOfMessages)/ concurrency));
Promise.each(total, function (value, index, length) {
var toDo = Array(Math.floor(concurrency / 10) + 1);
messages = [];
var gmPromises = [];
Promise.each(toDo, function (value, index, length) {
gmPromises.push(
sqs.receiveMessageAsync({
QueueUrl: queueUrl,
WaitTimeSeconds: 20,
VisibilityTimeout: 120,
MaxNumberOfMessages: (concurrency < 10 ? concurrency : 10)
}).then(function (data) {
if (data.Messages.length == 0) {
done = true;
} else {
messages = messages.concat(data.Messages);
}
})
);
}).then(function() {
Promise.all(gmPromises).then(function () {
var promises = [];
Promise.map(messages, function (message) {
var tmpFilename = '/tmp/' + md5(s3key) + '.png';
var process = spawn('/opt/wkhtmltox/bin/wkhtmltoimage', ['--width', '1300', '--height', '900', body.url, tmpFilename]);
console.log('Running wkhtmltoimage ' + body.url + ' ' + tmpFilename);
process.stdout.on('data', function (data) {
console.log(data.toString());
});
process.stderr.on('data', function (err) {
console.log(err.toString());
});
return new Promise(function (resolve, reject) {
process.on('exit', function (code) {
if (code == 0) {
fs.readFile(tmpFilename, {}, function (err, data) {
if (err) {
throw err;
} else {
var fileData = Buffer.from(data, 'binary');
var s3 = new AWS.S3();
s3.putObject({
Bucket: 'mybucket',
Key: s3key,
Body: fileData,
ACL: 'public-read'
}, function (err, resp) {
var deleteMessagePromise = sqs.deleteMessage({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle
}).promise();
deleteMessagePromise.catch(function (err) {
console.log('SQS deleteMessage failed: ', err, err.stack);
});
promises.push(deleteMessagePromise);
console.log(arguments);
console.log('Successfully uploaded package.');
resolve();
});
}
}
);
}
});
});
}, {
concurrency: 15
});
return Promise.all(promises);
});
});
});
但是我发现有超过 15 个 wkhtmltoimage 运行 并行。从 SQS 中检索 15 条消息的批次似乎是并行的,即使我使用的是 Bluebird 的 Promise.each?
在我上面的代码中,我所做的是
- 找出我的队列中有多少项目
- 除以并发处理的项目数 -> 组数
- 每组
- 一次处理 15 个
- 循环到下一组
因为所有相关代码都使用异步调用,所以我队列中的所有项目将被并行处理,这不是代码的意图。相反,我想限制并发性,所以我只使用 wkhtmltoimage
使用的 x 内存量
我需要的是从我的队列中接收 15 个项目,然后停止接收项目,直到我的一些 wkhtml 进程完成。然后我可以接收物品,直到我再次有 15 个 运行 处理。
经过大量研究后,我发现这就是 node.js streams do when in paused mode。我还找到了一个 node.js 包,它将 SQS 队列包装成一个流:sqs-readable-stream
这是我的最终代码:
var sqsStream = new SQSReadableStream({
sqsClient: sqs,
queueUrl: queueUrl
});
var collect = [];
sqsStream.on('data', function(message){
collect.push(message);
if (collect.length >= concurrency){
sqsStream.pause();
}
var body = JSON.parse(message.Body);
var s3key = id + '/' + befAft + '/' + body.url.replace(/http(s)?:\/\//, '') + '.png';
var tmpFilename = '/tmp/' + md5(s3key) + '.png';
var process = spawn('/opt/wkhtmltox/bin/wkhtmltoimage', ['--width', '1300', '--height', '900', body.url, tmpFilename]);
console.log('Running wkhtmltoimage ' + body.url + ' ' + tmpFilename);
process.stdout.on('data', function (data) {
//console.log(data.toString());
});
process.stderr.on('data', function (err) {
//console.log(err.toString());
});
process.on('exit', function (code) {
if (code == 0) {
fs.readFile(tmpFilename, {}, function (err, data) {
if (err) {
throw err;
} else {
var fileData = Buffer.from(data, 'binary');
var s3 = new AWS.S3();
s3.putObject({
Bucket: 'somebucket',
Key: s3key,
Body: fileData
}, function (err, resp) {
message.deleteMessage();
console.log(arguments);
console.log('Successfully uploaded package.');
collect.pop();
sqsStream.resume()
});
}
}
);
}
});
});
我有以下 nodejs 程序从 SQS 队列中检索大量网站 url 并使用 wkhtmltoimage 对它们进行截图,一次 15 个:
var concurrency=15;
//break down queue size into batches of concurrency size
var getTotalNumberMessages = sqs.getQueueAttributesAsync({
QueueUrl: queueUrl,
AttributeNames: ['All']
}).then(function(data) {
var total = Array(Math.floor(parseInt(data.Attributes.ApproximateNumberOfMessages)/ concurrency));
Promise.each(total, function (value, index, length) {
var toDo = Array(Math.floor(concurrency / 10) + 1);
messages = [];
var gmPromises = [];
Promise.each(toDo, function (value, index, length) {
gmPromises.push(
sqs.receiveMessageAsync({
QueueUrl: queueUrl,
WaitTimeSeconds: 20,
VisibilityTimeout: 120,
MaxNumberOfMessages: (concurrency < 10 ? concurrency : 10)
}).then(function (data) {
if (data.Messages.length == 0) {
done = true;
} else {
messages = messages.concat(data.Messages);
}
})
);
}).then(function() {
Promise.all(gmPromises).then(function () {
var promises = [];
Promise.map(messages, function (message) {
var tmpFilename = '/tmp/' + md5(s3key) + '.png';
var process = spawn('/opt/wkhtmltox/bin/wkhtmltoimage', ['--width', '1300', '--height', '900', body.url, tmpFilename]);
console.log('Running wkhtmltoimage ' + body.url + ' ' + tmpFilename);
process.stdout.on('data', function (data) {
console.log(data.toString());
});
process.stderr.on('data', function (err) {
console.log(err.toString());
});
return new Promise(function (resolve, reject) {
process.on('exit', function (code) {
if (code == 0) {
fs.readFile(tmpFilename, {}, function (err, data) {
if (err) {
throw err;
} else {
var fileData = Buffer.from(data, 'binary');
var s3 = new AWS.S3();
s3.putObject({
Bucket: 'mybucket',
Key: s3key,
Body: fileData,
ACL: 'public-read'
}, function (err, resp) {
var deleteMessagePromise = sqs.deleteMessage({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle
}).promise();
deleteMessagePromise.catch(function (err) {
console.log('SQS deleteMessage failed: ', err, err.stack);
});
promises.push(deleteMessagePromise);
console.log(arguments);
console.log('Successfully uploaded package.');
resolve();
});
}
}
);
}
});
});
}, {
concurrency: 15
});
return Promise.all(promises);
});
});
});
但是我发现有超过 15 个 wkhtmltoimage 运行 并行。从 SQS 中检索 15 条消息的批次似乎是并行的,即使我使用的是 Bluebird 的 Promise.each?
在我上面的代码中,我所做的是
- 找出我的队列中有多少项目
- 除以并发处理的项目数 -> 组数
- 每组
- 一次处理 15 个
- 循环到下一组
因为所有相关代码都使用异步调用,所以我队列中的所有项目将被并行处理,这不是代码的意图。相反,我想限制并发性,所以我只使用 wkhtmltoimage
使用的 x 内存量我需要的是从我的队列中接收 15 个项目,然后停止接收项目,直到我的一些 wkhtml 进程完成。然后我可以接收物品,直到我再次有 15 个 运行 处理。
经过大量研究后,我发现这就是 node.js streams do when in paused mode。我还找到了一个 node.js 包,它将 SQS 队列包装成一个流:sqs-readable-stream
这是我的最终代码:
var sqsStream = new SQSReadableStream({
sqsClient: sqs,
queueUrl: queueUrl
});
var collect = [];
sqsStream.on('data', function(message){
collect.push(message);
if (collect.length >= concurrency){
sqsStream.pause();
}
var body = JSON.parse(message.Body);
var s3key = id + '/' + befAft + '/' + body.url.replace(/http(s)?:\/\//, '') + '.png';
var tmpFilename = '/tmp/' + md5(s3key) + '.png';
var process = spawn('/opt/wkhtmltox/bin/wkhtmltoimage', ['--width', '1300', '--height', '900', body.url, tmpFilename]);
console.log('Running wkhtmltoimage ' + body.url + ' ' + tmpFilename);
process.stdout.on('data', function (data) {
//console.log(data.toString());
});
process.stderr.on('data', function (err) {
//console.log(err.toString());
});
process.on('exit', function (code) {
if (code == 0) {
fs.readFile(tmpFilename, {}, function (err, data) {
if (err) {
throw err;
} else {
var fileData = Buffer.from(data, 'binary');
var s3 = new AWS.S3();
s3.putObject({
Bucket: 'somebucket',
Key: s3key,
Body: fileData
}, function (err, resp) {
message.deleteMessage();
console.log(arguments);
console.log('Successfully uploaded package.');
collect.pop();
sqsStream.resume()
});
}
}
);
}
});
});