可以让事件处理程序等到异步/基于 Promise 的代码完成吗?
Possible to make an event handler wait until async / Promise-based code is done?
我在 nodejs 模式下使用优秀的 Papa Parse 库,将一个超过 100 万行的大型 (500 MB) CSV 文件流式传输到一个缓慢的持久性 API,它只能接受一个请求一个时间。持久性 API 基于 Promise
s,但是从 Papa Parse,我在 synchronous 事件中收到每个解析的 CSV 行,如下所示:parseStream.on("data", row => { ... }
我面临的挑战是 Papa Parse 从流中转储其 CSV 行的速度如此之快,以至于我缓慢的持久性 API 跟不上。因为 Papa 是 synchronous 而我的 API 是基于 Promise 的,所以我不能只调用 await doDirtyWork(row)
在 on
事件处理程序,因为同步和异步代码不混合。
或者它们可以混合,我只是不知道如何混合?
我的问题是,我可以让爸爸的事件处理程序等待我的 API 调用完成吗?直接在 on("data")
事件中执行持久性 API 请求,使 on()
函数以某种方式徘徊,直到肮脏的 API 工作完成?
就内存占用而言,我目前的解决方案并不比使用 Papa 的非流模式好多少。实际上,我需要以生成器函数迭代的形式 排队 on("data")
事件的洪流。我也可以将 promise 工厂排成一个数组,然后循环处理它。不管怎样,我最终将几乎整个 CSV 文件保存为内存中未来承诺(承诺工厂)的巨大集合,直到我缓慢的 API 调用一直有效。
async importCSV(filePath) {
let parsedNum = 0, processedNum = 0;
async function* gen() {
let pf = yield;
do {
pf = yield await pf();
} while (typeof pf === "function");
};
var g = gen();
g.next();
await new Promise((resolve, reject) => {
try {
const dataStream = fs.createReadStream(filePath);
const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
dataStream.pipe(parseStream);
parseStream.on("data", row => {
// Received a CSV row from Papa.parse()
try {
console.log("PA#", parsedNum, ": parsed", row.filter((e, i) => i <= 2 ? e : undefined)
);
parsedNum++;
// Simulate some really slow async/await dirty work here, for example
// send requests to a one-at-a-time persistence API
g.next(() => { // don't execute now, call in sequence via the generator above
return new Promise((res, rej) => {
console.log(
"DW#", processedNum, ": dirty work START",
row.filter((e, i) => i <= 2 ? e : undefined)
);
setTimeout(() => {
console.log(
"DW#", processedNum, ": dirty work STOP ",
row.filter((e, i) => i <= 2 ? e : undefined)
);
processedNum++;
res();
}, 1000)
})
});
} catch (err) {
console.log(err.stack);
reject(err);
}
});
parseStream.on("finish", () => {
console.log(`Parsed ${parsedNum} rows`);
resolve();
});
} catch (err) {
console.log(err.stack);
reject(err);
}
});
while(!(await g.next()).done);
}
所以为什么要赶爸爸?为什么不允许我慢一点处理文件——原始 CSV 文件中的数据不会 运行 消失,我们有几个小时来完成流式传输,为什么要用 on("data")
事件来敲打我我似乎不能放慢速度?
所以我真正需要的是让爸爸变得更像个爷爷,并尽量减少或消除 CSV 行的任何排队或缓冲。理想情况下,我能够将爸爸的解析事件与我的 API 的速度(或缺乏)完全同步。因此,如果不是因为异步代码无法让同步代码“休眠”这一教条,我最好将每个 CSV 行发送到 Papa 事件中的 API , 并且只有 然后 return 控制给爸爸。
建议?事件处理程序的某种“松散耦合”与我的异步 API 的缓慢也很好。我不介意几百行排队。但是当数以万计的堆起来的时候,我会运行 out of heap fast.
JavaScript 中的异步代码有时可能有点难以理解。重要的是要记住 Node 是如何处理并发的。
节点进程是single-threaded,但它使用了一个称为事件循环的概念。这样做的结果是异步代码和回调本质上是同一事物的等效表示。
当然,您需要一个异步函数才能使用 await
,但是您从 Papa Parse 的回调可以是一个异步函数:
parse.on("data", async row => {
await sync(row)
})
一旦await
操作完成,箭头函数结束,所有对行的引用将被清除,因此垃圾收集器可以成功收集row
,释放内存。
这样做的效果是每次解析一行时并发执行sync
,所以如果您一次只能同步一条记录,那么我建议将同步功能包装在去抖动器中。
Why hammer me with on("data")
events that I can't seem to slow down?
你可以,你只是没有让爸爸停下来。您可以通过调用 stream.pause()
, then later stream.resume()
来使用节点流的内置 back-pressure.
但是,API 有比您自己在 callback-based 代码中处理此问题更好用的方法:use the stream as an async iterator!当您在 for await
循环体中 await
时,生成器也必须暂停。所以你可以写
async importCSV(filePath) {
let parsedNum = 0;
const dataStream = fs.createReadStream(filePath);
const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
dataStream.pipe(parseStream);
for await (const row of parseStream) {
// Received a CSV row from Papa.parse()
const data = row.filter((e, i) => i <= 2 ? e : undefined);
console.log("PA#", parsedNum, ": parsed", data);
parsedNum++;
await dirtyWork(data);
}
console.log(`Parsed ${parsedNum} rows`);
}
importCSV('sample.csv').catch(console.error);
let processedNum = 0;
function dirtyWork(data) {
// Simulate some really slow async/await dirty work here,
// for example send requests to a one-at-a-time persistence API
return new Promise((res, rej) => {
console.log("DW#", processedNum, ": dirty work START", data)
setTimeout(() => {
console.log("DW#", processedNum, ": dirty work STOP ", data);
processedNum++;
res();
}, 1000);
});
}
我在 nodejs 模式下使用优秀的 Papa Parse 库,将一个超过 100 万行的大型 (500 MB) CSV 文件流式传输到一个缓慢的持久性 API,它只能接受一个请求一个时间。持久性 API 基于 Promise
s,但是从 Papa Parse,我在 synchronous 事件中收到每个解析的 CSV 行,如下所示:parseStream.on("data", row => { ... }
我面临的挑战是 Papa Parse 从流中转储其 CSV 行的速度如此之快,以至于我缓慢的持久性 API 跟不上。因为 Papa 是 synchronous 而我的 API 是基于 Promise 的,所以我不能只调用 await doDirtyWork(row)
在 on
事件处理程序,因为同步和异步代码不混合。
或者它们可以混合,我只是不知道如何混合?
我的问题是,我可以让爸爸的事件处理程序等待我的 API 调用完成吗?直接在 on("data")
事件中执行持久性 API 请求,使 on()
函数以某种方式徘徊,直到肮脏的 API 工作完成?
就内存占用而言,我目前的解决方案并不比使用 Papa 的非流模式好多少。实际上,我需要以生成器函数迭代的形式 排队 on("data")
事件的洪流。我也可以将 promise 工厂排成一个数组,然后循环处理它。不管怎样,我最终将几乎整个 CSV 文件保存为内存中未来承诺(承诺工厂)的巨大集合,直到我缓慢的 API 调用一直有效。
async importCSV(filePath) {
let parsedNum = 0, processedNum = 0;
async function* gen() {
let pf = yield;
do {
pf = yield await pf();
} while (typeof pf === "function");
};
var g = gen();
g.next();
await new Promise((resolve, reject) => {
try {
const dataStream = fs.createReadStream(filePath);
const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
dataStream.pipe(parseStream);
parseStream.on("data", row => {
// Received a CSV row from Papa.parse()
try {
console.log("PA#", parsedNum, ": parsed", row.filter((e, i) => i <= 2 ? e : undefined)
);
parsedNum++;
// Simulate some really slow async/await dirty work here, for example
// send requests to a one-at-a-time persistence API
g.next(() => { // don't execute now, call in sequence via the generator above
return new Promise((res, rej) => {
console.log(
"DW#", processedNum, ": dirty work START",
row.filter((e, i) => i <= 2 ? e : undefined)
);
setTimeout(() => {
console.log(
"DW#", processedNum, ": dirty work STOP ",
row.filter((e, i) => i <= 2 ? e : undefined)
);
processedNum++;
res();
}, 1000)
})
});
} catch (err) {
console.log(err.stack);
reject(err);
}
});
parseStream.on("finish", () => {
console.log(`Parsed ${parsedNum} rows`);
resolve();
});
} catch (err) {
console.log(err.stack);
reject(err);
}
});
while(!(await g.next()).done);
}
所以为什么要赶爸爸?为什么不允许我慢一点处理文件——原始 CSV 文件中的数据不会 运行 消失,我们有几个小时来完成流式传输,为什么要用 on("data")
事件来敲打我我似乎不能放慢速度?
所以我真正需要的是让爸爸变得更像个爷爷,并尽量减少或消除 CSV 行的任何排队或缓冲。理想情况下,我能够将爸爸的解析事件与我的 API 的速度(或缺乏)完全同步。因此,如果不是因为异步代码无法让同步代码“休眠”这一教条,我最好将每个 CSV 行发送到 Papa 事件中的 API , 并且只有 然后 return 控制给爸爸。
建议?事件处理程序的某种“松散耦合”与我的异步 API 的缓慢也很好。我不介意几百行排队。但是当数以万计的堆起来的时候,我会运行 out of heap fast.
JavaScript 中的异步代码有时可能有点难以理解。重要的是要记住 Node 是如何处理并发的。
节点进程是single-threaded,但它使用了一个称为事件循环的概念。这样做的结果是异步代码和回调本质上是同一事物的等效表示。
当然,您需要一个异步函数才能使用 await
,但是您从 Papa Parse 的回调可以是一个异步函数:
parse.on("data", async row => {
await sync(row)
})
一旦await
操作完成,箭头函数结束,所有对行的引用将被清除,因此垃圾收集器可以成功收集row
,释放内存。
这样做的效果是每次解析一行时并发执行sync
,所以如果您一次只能同步一条记录,那么我建议将同步功能包装在去抖动器中。
Why hammer me with
on("data")
events that I can't seem to slow down?
你可以,你只是没有让爸爸停下来。您可以通过调用 stream.pause()
, then later stream.resume()
来使用节点流的内置 back-pressure.
但是,API 有比您自己在 callback-based 代码中处理此问题更好用的方法:use the stream as an async iterator!当您在 for await
循环体中 await
时,生成器也必须暂停。所以你可以写
async importCSV(filePath) {
let parsedNum = 0;
const dataStream = fs.createReadStream(filePath);
const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
dataStream.pipe(parseStream);
for await (const row of parseStream) {
// Received a CSV row from Papa.parse()
const data = row.filter((e, i) => i <= 2 ? e : undefined);
console.log("PA#", parsedNum, ": parsed", data);
parsedNum++;
await dirtyWork(data);
}
console.log(`Parsed ${parsedNum} rows`);
}
importCSV('sample.csv').catch(console.error);
let processedNum = 0;
function dirtyWork(data) {
// Simulate some really slow async/await dirty work here,
// for example send requests to a one-at-a-time persistence API
return new Promise((res, rej) => {
console.log("DW#", processedNum, ": dirty work START", data)
setTimeout(() => {
console.log("DW#", processedNum, ": dirty work STOP ", data);
processedNum++;
res();
}, 1000);
});
}