可以让事件处理程序等到异步/基于 Promise 的代码完成吗?

Possible to make an event handler wait until async / Promise-based code is done?

我在 nodejs 模式下使用优秀的 Papa Parse 库,将一个超过 100 万行的大型 (500 MB) CSV 文件流式传输到一个缓慢的持久性 API,它只能接受一个请求一个时间。持久性 API 基于 Promises,但是从 Papa Parse,我在 synchronous 事件中收到每个解析的 CSV 行,如下所示:parseStream.on("data", row => { ... }

我面临的挑战是 Papa Parse 从流中转储其 CSV 行的速度如此之快,以至于我缓慢的持久性 API 跟不上。因为 Papa 是 synchronous 而我的 API 是基于 Promise 的,所以我不能只调用 await doDirtyWork(row)on 事件处理程序,因为同步和异步代码不混合。

或者它们可以混合,我只是不知道如何混合?

我的问题是,我可以让爸爸的事件处理程序等待我的 API 调用完成吗?直接在 on("data") 事件中执行持久性 API 请求,使 on() 函数以某种方式徘徊,直到肮脏的 API 工作完成?

就内存占用而言,我目前的解决方案并不比使用 Papa 的非流模式好多少。实际上,我需要以生成器函数迭代的形式 排队 on("data") 事件的洪流。我也可以将 promise 工厂排成一个数组,然后循环处理它。不管怎样,我最终将几乎整个 CSV 文件保存为内存中未来承诺(承诺工厂)的巨大集合,直到我缓慢的 API 调用一直有效。

async importCSV(filePath) {
    let parsedNum = 0, processedNum = 0;

    async function* gen() {
        let pf = yield;
        do {
            pf = yield await pf();
        } while (typeof pf === "function");
    };

    var g = gen();
    g.next();


    await new Promise((resolve, reject) => {
        try {
            const dataStream = fs.createReadStream(filePath);
            const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
            dataStream.pipe(parseStream);

            parseStream.on("data", row => {

                // Received a CSV row from Papa.parse()

                try {
                    console.log("PA#", parsedNum, ": parsed", row.filter((e, i) => i <= 2 ? e : undefined)
                    );
                    parsedNum++;

                    // Simulate some really slow async/await dirty work here, for example 
                    // send requests to a one-at-a-time persistence API

                    g.next(() => {  // don't execute now, call in sequence via the generator above
                        return new Promise((res, rej) => {
                            console.log(
                                "DW#", processedNum, ": dirty work START",
                                row.filter((e, i) => i <= 2 ? e : undefined)
                            );
                            setTimeout(() => {
                                console.log(
                                    "DW#", processedNum, ": dirty work STOP ",
                                    row.filter((e, i) => i <= 2 ? e : undefined)
                                );
                                processedNum++;
                                res();
                            }, 1000)
                        })
                    
                    });
                } catch (err) {
                    console.log(err.stack);
                    reject(err);                    
                }
            });
            parseStream.on("finish", () => {
                console.log(`Parsed ${parsedNum} rows`);
                resolve();
            });

        } catch (err) {
            console.log(err.stack);
            reject(err);                    
        }
    });
    while(!(await g.next()).done);
}

所以为什么要赶爸爸?为什么不允许我慢一点处理文件——原始 CSV 文件中的数据不会 运行 消失,我们有几个小时来完成流式传输,为什么要用 on("data") 事件来敲打我我似乎不能放慢速度?

所以我真正需要的是让爸爸变得更像个爷爷,并尽量减少或消除 CSV 行的任何排队或缓冲。理想情况下,我能够将爸爸的解析事件与我的 API 的速度(或缺乏)完全同步。因此,如果不是因为异步代码无法让同步代码“休眠”这一教条,我最好将每个 CSV 行发送到 Papa 事件中的 API , 并且只有 然后 return 控制给爸爸。

建议?事件处理程序的某种“松散耦合”与我的异步 API 的缓慢也很好。我不介意几百行排队。但是当数以万计的堆起来的时候,我会运行 out of heap fast.

JavaScript 中的异步代码有时可能有点难以理解。重要的是要记住 Node 是如何处理并发的。

节点进程是single-threaded,但它使用了一个称为事件循环的概念。这样做的结果是异步代码和回调本质上是同一事物的等效表示。

当然,您需要一个异步函数才能使用 await,但是您从 Papa Parse 的回调可以是一个异步函数:

parse.on("data", async row => {
  await sync(row)
})

一旦await操作完成,箭头函数结束,所有对行的引用将被清除,因此垃圾收集器可以成功收集row,释放内存。

这样做的效果是每次解析一行时并发执行sync,所以如果您一次只能同步一条记录,那么我建议将同步功能包装在去抖动器中。

Why hammer me with on("data") events that I can't seem to slow down?

你可以,你只是没有让爸爸停下来。您可以通过调用 stream.pause(), then later stream.resume() 来使用节点流的内置 back-pressure.

但是,API 有比您自己在 callback-based 代码中处理此问题更好用的方法:use the stream as an async iterator!当您在 for await 循环体中 await 时,生成器也必须暂停。所以你可以写

async importCSV(filePath) {
    let parsedNum = 0;

    const dataStream = fs.createReadStream(filePath);
    const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
    dataStream.pipe(parseStream);

    for await (const row of parseStream) {
        // Received a CSV row from Papa.parse()
        const data = row.filter((e, i) => i <= 2 ? e : undefined);
        console.log("PA#", parsedNum, ": parsed", data);
        parsedNum++;
        await dirtyWork(data);
    }
    console.log(`Parsed ${parsedNum} rows`);
}

importCSV('sample.csv').catch(console.error);

let processedNum = 0;
function dirtyWork(data) {
    // Simulate some really slow async/await dirty work here,
    // for example send requests to a one-at-a-time persistence API
    return new Promise((res, rej) => {
        console.log("DW#", processedNum, ": dirty work START", data)
        setTimeout(() => {
             console.log("DW#", processedNum, ": dirty work STOP ", data);
             processedNum++;
             res();
        }, 1000);
    });
}