将异步等待与事件一起使用

Using Async await with Events

我正在尝试使用 nodejs readline 逐行读取文件,对于每一行我想异步执行一些功能,然后继续直到文件末尾。

const readline = require("readline");
const fs = require("fs");

let rl = readline.createInterface({
    input: fs.createReadStream('b'),
    crlfDelay: Infinity
});

rl.on('line', async (line) => {
    console.log('start line');
    await both();
    console.log('end line');
});

rl.on('close', () => {
    console.log('read complete');
});

function one() {
    return new Promise((resolve, reject) => {
        setTimeout(() => resolve('two'), 2000);
    });
}

function two() {
    return new Promise((resolve, reject) => {
        setTimeout(() => resolve('two'), 3000);
    });
}
async function both() {
    ap = [];
    ap.push(one());
    ap.push(two());
    console.log('processing line');
    await Promise.all(ap);
    console.log('line processed');
}

文件 b 可以是任何包含几行的文件,例如

1
2
3
4
5
6
7

我期望的输出是这样的:

start line
line processing
line processed
end line
.
.
.

但是,我无法维持秩序。

据我了解,似乎正在发出 'line' 事件,它一次又一次地调用回调!。

有什么方法可以让这个事件等待,直到手头的事件被异步处理(各种步骤 运行 异步),然后重复。

**重要更新** 因此用例文件将包含大约 >5GB 的 CSV 文本。 我们的内存限制为 <3GB,最长时间为 15 分钟 (AWS Lambda)。

To my understanding, it seems the 'line' event is getting emitted which is calling the callback again and again!.

是的,我认为这也是问题。

问题类似于Producer Consumer problem

您可以做的是创建一个事件列表,并在调用时将行事件添加到事件列表中。 唯一的区别是生产者(正在创建的事件)永远不会填满缓冲区。但是需要提醒消费者(函数both)消费剩下的事件。如果没有事件发生,消费者就进入休眠状态。每次有新的事件,生产者检查消费者是否醒了,如果没有,就唤醒消费者。

你的解决方案应该是-

const readline = require("readline");
const fs = require("fs");

let rl = readline.createInterface({
    input: fs.createReadStream('b'),
    crlfDelay: Infinity
});

const lineEventsToProcess = [];
let bothRunning = false;
rl.on('line', (line) => {
    // Add the line event to the list of line events
    lineEventsToProcess.push(line)
    // Both is not running i.e. the consumer is asleep
    if (!bothRunning) {
        both()
    }
});

rl.on('close', () => {
    console.log('read complete');
});

function one() {
    return new Promise((resolve, reject) => {
        setTimeout(() => resolve('two'), 2000);
    });
}

function two() {
    return new Promise((resolve, reject) => {
        setTimeout(() => resolve('two'), 3000);
    });
}
async function both() {
    // Set bothRunning to true
    bothRunning = true;

    while(lineEventsToProcess.length > 0) {
        console.log('start line');
        ap = [];
        ap.push(one());
        ap.push(two());
        console.log('processing line');
        await Promise.all(ap);
        console.log('line processed');

        // Remove the first element
        lineEventsToProcess.splice(0, 1)
        console.log('end line');
    }

    // Both is not running anymore
    bothRunning = false;
}

通过将行事件替换为 setInterval 对其进行了一些修改,以便我可以对其进行测试。如果你想在浏览器中测试它或者有人有类似的问题,就在这里-

const lineEventsToProcess = [];
let bothRunning = false;
setInterval(() => {
    // Add the line event to the list of line events
    lineEventsToProcess.push(1)
    // Both is not running i.e. the consumer is asleep
    if (!bothRunning) {
        both();
    }
}, 100);

function one() {
    return new Promise((resolve, reject) => {
        setTimeout(() => resolve('two'), 2000);
    });
}

function two() {
    return new Promise((resolve, reject) => {
        setTimeout(() => resolve('two'), 3000);
    });
}
async function both() {
    // Set bothRunning to true
    bothRunning = true;

    while(lineEventsToProcess.length > 0) {
        console.log('start line');
        ap = [];
        ap.push(one());
        ap.push(two());
        console.log('processing line');
        await Promise.all(ap);
        console.log('line processed');

        // Remove the first element
        lineEventsToProcess.splice(0, 1)
        console.log('end line');
    }

    // Both is not running anymore
    bothRunning = false;
}

如果您需要更多解释,请在下方评论。

我并不是说这是最好的解决方案,但它应该可行。如果你想改进这一点,我建议通过为生产者和消费者创建 类 来模块化代码。网上有大量解决生产者-消费者问题的方法。