将异步等待与事件一起使用
Using Async await with Events
我正在尝试使用 nodejs readline
逐行读取文件,对于每一行我想异步执行一些功能,然后继续直到文件末尾。
const readline = require("readline");
const fs = require("fs");
let rl = readline.createInterface({
input: fs.createReadStream('b'),
crlfDelay: Infinity
});
rl.on('line', async (line) => {
console.log('start line');
await both();
console.log('end line');
});
rl.on('close', () => {
console.log('read complete');
});
function one() {
return new Promise((resolve, reject) => {
setTimeout(() => resolve('two'), 2000);
});
}
function two() {
return new Promise((resolve, reject) => {
setTimeout(() => resolve('two'), 3000);
});
}
async function both() {
ap = [];
ap.push(one());
ap.push(two());
console.log('processing line');
await Promise.all(ap);
console.log('line processed');
}
文件 b
可以是任何包含几行的文件,例如
1
2
3
4
5
6
7
我期望的输出是这样的:
start line
line processing
line processed
end line
.
.
.
但是,我无法维持秩序。
据我了解,似乎正在发出 'line'
事件,它一次又一次地调用回调!。
有什么方法可以让这个事件等待,直到手头的事件被异步处理(各种步骤 运行 异步),然后重复。
**重要更新**
因此用例文件将包含大约 >5GB 的 CSV 文本。
我们的内存限制为 <3GB,最长时间为 15 分钟 (AWS Lambda)。
To my understanding, it seems the 'line' event is getting emitted
which is calling the callback again and again!.
是的,我认为这也是问题。
问题类似于Producer Consumer problem。
您可以做的是创建一个事件列表,并在调用时将行事件添加到事件列表中。
唯一的区别是生产者(正在创建的事件)永远不会填满缓冲区。但是需要提醒消费者(函数both
)消费剩下的事件。如果没有事件发生,消费者就进入休眠状态。每次有新的事件,生产者检查消费者是否醒了,如果没有,就唤醒消费者。
你的解决方案应该是-
const readline = require("readline");
const fs = require("fs");
let rl = readline.createInterface({
input: fs.createReadStream('b'),
crlfDelay: Infinity
});
const lineEventsToProcess = [];
let bothRunning = false;
rl.on('line', (line) => {
// Add the line event to the list of line events
lineEventsToProcess.push(line)
// Both is not running i.e. the consumer is asleep
if (!bothRunning) {
both()
}
});
rl.on('close', () => {
console.log('read complete');
});
function one() {
return new Promise((resolve, reject) => {
setTimeout(() => resolve('two'), 2000);
});
}
function two() {
return new Promise((resolve, reject) => {
setTimeout(() => resolve('two'), 3000);
});
}
async function both() {
// Set bothRunning to true
bothRunning = true;
while(lineEventsToProcess.length > 0) {
console.log('start line');
ap = [];
ap.push(one());
ap.push(two());
console.log('processing line');
await Promise.all(ap);
console.log('line processed');
// Remove the first element
lineEventsToProcess.splice(0, 1)
console.log('end line');
}
// Both is not running anymore
bothRunning = false;
}
通过将行事件替换为 setInterval 对其进行了一些修改,以便我可以对其进行测试。如果你想在浏览器中测试它或者有人有类似的问题,就在这里-
const lineEventsToProcess = [];
let bothRunning = false;
setInterval(() => {
// Add the line event to the list of line events
lineEventsToProcess.push(1)
// Both is not running i.e. the consumer is asleep
if (!bothRunning) {
both();
}
}, 100);
function one() {
return new Promise((resolve, reject) => {
setTimeout(() => resolve('two'), 2000);
});
}
function two() {
return new Promise((resolve, reject) => {
setTimeout(() => resolve('two'), 3000);
});
}
async function both() {
// Set bothRunning to true
bothRunning = true;
while(lineEventsToProcess.length > 0) {
console.log('start line');
ap = [];
ap.push(one());
ap.push(two());
console.log('processing line');
await Promise.all(ap);
console.log('line processed');
// Remove the first element
lineEventsToProcess.splice(0, 1)
console.log('end line');
}
// Both is not running anymore
bothRunning = false;
}
如果您需要更多解释,请在下方评论。
我并不是说这是最好的解决方案,但它应该可行。如果你想改进这一点,我建议通过为生产者和消费者创建 类 来模块化代码。网上有大量解决生产者-消费者问题的方法。
我正在尝试使用 nodejs readline
逐行读取文件,对于每一行我想异步执行一些功能,然后继续直到文件末尾。
const readline = require("readline");
const fs = require("fs");
let rl = readline.createInterface({
input: fs.createReadStream('b'),
crlfDelay: Infinity
});
rl.on('line', async (line) => {
console.log('start line');
await both();
console.log('end line');
});
rl.on('close', () => {
console.log('read complete');
});
function one() {
return new Promise((resolve, reject) => {
setTimeout(() => resolve('two'), 2000);
});
}
function two() {
return new Promise((resolve, reject) => {
setTimeout(() => resolve('two'), 3000);
});
}
async function both() {
ap = [];
ap.push(one());
ap.push(two());
console.log('processing line');
await Promise.all(ap);
console.log('line processed');
}
文件 b
可以是任何包含几行的文件,例如
1
2
3
4
5
6
7
我期望的输出是这样的:
start line
line processing
line processed
end line
.
.
.
但是,我无法维持秩序。
据我了解,似乎正在发出 'line'
事件,它一次又一次地调用回调!。
有什么方法可以让这个事件等待,直到手头的事件被异步处理(各种步骤 运行 异步),然后重复。
**重要更新** 因此用例文件将包含大约 >5GB 的 CSV 文本。 我们的内存限制为 <3GB,最长时间为 15 分钟 (AWS Lambda)。
To my understanding, it seems the 'line' event is getting emitted which is calling the callback again and again!.
是的,我认为这也是问题。
问题类似于Producer Consumer problem。
您可以做的是创建一个事件列表,并在调用时将行事件添加到事件列表中。
唯一的区别是生产者(正在创建的事件)永远不会填满缓冲区。但是需要提醒消费者(函数both
)消费剩下的事件。如果没有事件发生,消费者就进入休眠状态。每次有新的事件,生产者检查消费者是否醒了,如果没有,就唤醒消费者。
你的解决方案应该是-
const readline = require("readline");
const fs = require("fs");
let rl = readline.createInterface({
input: fs.createReadStream('b'),
crlfDelay: Infinity
});
const lineEventsToProcess = [];
let bothRunning = false;
rl.on('line', (line) => {
// Add the line event to the list of line events
lineEventsToProcess.push(line)
// Both is not running i.e. the consumer is asleep
if (!bothRunning) {
both()
}
});
rl.on('close', () => {
console.log('read complete');
});
function one() {
return new Promise((resolve, reject) => {
setTimeout(() => resolve('two'), 2000);
});
}
function two() {
return new Promise((resolve, reject) => {
setTimeout(() => resolve('two'), 3000);
});
}
async function both() {
// Set bothRunning to true
bothRunning = true;
while(lineEventsToProcess.length > 0) {
console.log('start line');
ap = [];
ap.push(one());
ap.push(two());
console.log('processing line');
await Promise.all(ap);
console.log('line processed');
// Remove the first element
lineEventsToProcess.splice(0, 1)
console.log('end line');
}
// Both is not running anymore
bothRunning = false;
}
通过将行事件替换为 setInterval 对其进行了一些修改,以便我可以对其进行测试。如果你想在浏览器中测试它或者有人有类似的问题,就在这里-
const lineEventsToProcess = [];
let bothRunning = false;
setInterval(() => {
// Add the line event to the list of line events
lineEventsToProcess.push(1)
// Both is not running i.e. the consumer is asleep
if (!bothRunning) {
both();
}
}, 100);
function one() {
return new Promise((resolve, reject) => {
setTimeout(() => resolve('two'), 2000);
});
}
function two() {
return new Promise((resolve, reject) => {
setTimeout(() => resolve('two'), 3000);
});
}
async function both() {
// Set bothRunning to true
bothRunning = true;
while(lineEventsToProcess.length > 0) {
console.log('start line');
ap = [];
ap.push(one());
ap.push(two());
console.log('processing line');
await Promise.all(ap);
console.log('line processed');
// Remove the first element
lineEventsToProcess.splice(0, 1)
console.log('end line');
}
// Both is not running anymore
bothRunning = false;
}
如果您需要更多解释,请在下方评论。
我并不是说这是最好的解决方案,但它应该可行。如果你想改进这一点,我建议通过为生产者和消费者创建 类 来模块化代码。网上有大量解决生产者-消费者问题的方法。