承诺中的一些 thenables 没有被执行
Some thenables in a promise is not being executed
我试图读取和解析一个大型 csv 文件,对于每一行我都必须进行一些异步计算并在操作完成后增加计数器。所以我创建了一个 Promise p
并尝试链接许多 .then(xxx)
并且在 csv 读取结束时它是最终的 .then(yyy)
输出计数。
然而这个数字并没有加起来。但是如果我做 p = p.then(xxx)
和 p = p.then(yyy)
数字会加起来(对于较小的 csv 文件)但我有时会面临内存泄漏(对于大型 csv 文件)。
我是不是哪里做错了?
var fs = require('fs')
const csv = require('fast-csv');
var Promise = require('bluebird')
var count = 0;
var actual = 0;
let p = Promise.resolve();
const stream = fs.createReadStream(`/Users/ssmlee/Desktop/KingKong_Sims_5M.txt`);
const csvStream = csv({
delimiter: ';'
})
.on('data', (row) => {
count++
if (count % 10000 === 0) {
console.log(count)
console.log(process.memoryUsage().heapUsed)
}
p.then(() => { // instead if we do p = p.then(() => it will work correctly
return Promise.resolve().delay(5)
.then(function() {
actual++
})
});
})
.on('end', () => {
p.then(() => { // instead if we do p = p.then(() => it will work correctly
console.log(actual); // 4999977 or something like this
console.log(count); // 5000000
});
});
stream.pipe(csvStream);
如果您延迟增加 actual
计数,但从不等待 promise(丢弃 then
的结果),流可能会结束,但并非所有增量都已经发生。在您的示例中,23 个回调仍在等待 5 毫秒的延迟。顺便说一句,将所有这些链接在同一个 p = Promise.resolve()
上没有多大意义,您可以立即执行所有内容。
如果您正在做 p = p.then(…)
,那么您会构建一个非常长的承诺链。这不应该泄漏任何内存,但会使用大量内存 - 所有 5 毫秒的延迟按顺序链接在一起,您的脚本将花费(至少)25000 秒到 运行。一开始读入文件,生成数百万个promise,然后一个个resolve(可以垃圾回收)
要高效地执行此顺序方法,您可能应该使用流的背压系统。
但是你也可以同时等待延迟,一次没有太多有效的承诺:
p = Promise.all([p, Promise.delay(5)]).then(() => {
actual++;
});
好吧,你想要 运行 并行的承诺,所以不能链接它们。
allp = [];
....
.on('data', (row) => {
...
allp.push( p.then(() => {...}) );
}
...
.on('end', () => {
Promise.all(allp).then(() => {})
当然,您要为每个事件创建一个 Promise。
如果你需要在结束前释放 promise,那么你需要自己做。
由于您似乎对承诺的 return 价值不感兴趣,而只对它们的副作用(增加计数)感兴趣,因此您可以
.on('data', (row) => {
...
if (allp.length > 50) allp = [Promise.all(allp).then(()=>null)];
allp.push( p.then(() => {...}) );
}
这样 50 个承诺将被分组,一旦它们解决,它们将被单个承诺取代(将进入下一个 50...)
.then(()=>null)
确保从 Promise.all 生成的数组也被丢弃。 (相反,对 null 的一个承诺将在 allp 中)
这确实取决于 Promise.all 的实施。如果 Promise.all 在每个 promise 解决(并且结果可用)时释放每个 promise,那么这是完美的。
如果 Promise.all 等待所有 50 个承诺,然后将它们全部释放,那么这仍然有效,除非每组 50 个都有一个非常长的 运行ning 承诺。
您可以使用延迟承诺的反模式。
在开始时创建一个延迟承诺。
var resolve;
var asyncRunningCount = 1; // start with 1
var p2 = new Promise(function() {
resolve = arguments[0];
});
在上数据
.on('data', (row) => {
...
asyncRunningCount++;
p.then(() => {work}) )
.then(() {
asyncRunningCount--;
if (asyncRunningCount == 0) resolve(); // no more tasks running
} );
}
.on('end', () => {
asyncRunningCount--;
// remove the 1 that was set on start. No more new tasks will be added
if (asyncRunningCount == 0) resolve(); // no more tasks running
p2.then(() => { all done })
如果 运行ning 任务的计数暂时下降到 0,则启动时的值 1 会阻止 p2 被解析。
在on(end)中,1被递减。如果所有任务都完成,asyncRunningCount 将为 0。这可以通过 on(end) 中的递减或 on(data) 中的递减来实现。
p2.then,将在所有任务完成后运行。
所有其他承诺将在完成后释放。
事实上,在 on(data) 中你不需要承诺。只需启动您的异步任务,当异步任务完成时递减 asyncRunningCount,并检查 0。
这仍然意味着如果数据进来的速度非常快,那么很多承诺会同时开始。
但是如果你不启动promises,那么你需要存储传入的数据,所以无论哪种方式都会使用内存。
我试图读取和解析一个大型 csv 文件,对于每一行我都必须进行一些异步计算并在操作完成后增加计数器。所以我创建了一个 Promise p
并尝试链接许多 .then(xxx)
并且在 csv 读取结束时它是最终的 .then(yyy)
输出计数。
然而这个数字并没有加起来。但是如果我做 p = p.then(xxx)
和 p = p.then(yyy)
数字会加起来(对于较小的 csv 文件)但我有时会面临内存泄漏(对于大型 csv 文件)。
我是不是哪里做错了?
var fs = require('fs')
const csv = require('fast-csv');
var Promise = require('bluebird')
var count = 0;
var actual = 0;
let p = Promise.resolve();
const stream = fs.createReadStream(`/Users/ssmlee/Desktop/KingKong_Sims_5M.txt`);
const csvStream = csv({
delimiter: ';'
})
.on('data', (row) => {
count++
if (count % 10000 === 0) {
console.log(count)
console.log(process.memoryUsage().heapUsed)
}
p.then(() => { // instead if we do p = p.then(() => it will work correctly
return Promise.resolve().delay(5)
.then(function() {
actual++
})
});
})
.on('end', () => {
p.then(() => { // instead if we do p = p.then(() => it will work correctly
console.log(actual); // 4999977 or something like this
console.log(count); // 5000000
});
});
stream.pipe(csvStream);
如果您延迟增加 actual
计数,但从不等待 promise(丢弃 then
的结果),流可能会结束,但并非所有增量都已经发生。在您的示例中,23 个回调仍在等待 5 毫秒的延迟。顺便说一句,将所有这些链接在同一个 p = Promise.resolve()
上没有多大意义,您可以立即执行所有内容。
如果您正在做 p = p.then(…)
,那么您会构建一个非常长的承诺链。这不应该泄漏任何内存,但会使用大量内存 - 所有 5 毫秒的延迟按顺序链接在一起,您的脚本将花费(至少)25000 秒到 运行。一开始读入文件,生成数百万个promise,然后一个个resolve(可以垃圾回收)
要高效地执行此顺序方法,您可能应该使用流的背压系统。
但是你也可以同时等待延迟,一次没有太多有效的承诺:
p = Promise.all([p, Promise.delay(5)]).then(() => {
actual++;
});
好吧,你想要 运行 并行的承诺,所以不能链接它们。
allp = [];
....
.on('data', (row) => {
...
allp.push( p.then(() => {...}) );
}
...
.on('end', () => {
Promise.all(allp).then(() => {})
当然,您要为每个事件创建一个 Promise。
如果你需要在结束前释放 promise,那么你需要自己做。
由于您似乎对承诺的 return 价值不感兴趣,而只对它们的副作用(增加计数)感兴趣,因此您可以
.on('data', (row) => {
...
if (allp.length > 50) allp = [Promise.all(allp).then(()=>null)];
allp.push( p.then(() => {...}) );
}
这样 50 个承诺将被分组,一旦它们解决,它们将被单个承诺取代(将进入下一个 50...)
.then(()=>null)
确保从 Promise.all 生成的数组也被丢弃。 (相反,对 null 的一个承诺将在 allp 中)
这确实取决于 Promise.all 的实施。如果 Promise.all 在每个 promise 解决(并且结果可用)时释放每个 promise,那么这是完美的。
如果 Promise.all 等待所有 50 个承诺,然后将它们全部释放,那么这仍然有效,除非每组 50 个都有一个非常长的 运行ning 承诺。
您可以使用延迟承诺的反模式。
在开始时创建一个延迟承诺。
var resolve;
var asyncRunningCount = 1; // start with 1
var p2 = new Promise(function() {
resolve = arguments[0];
});
在上数据
.on('data', (row) => {
...
asyncRunningCount++;
p.then(() => {work}) )
.then(() {
asyncRunningCount--;
if (asyncRunningCount == 0) resolve(); // no more tasks running
} );
}
.on('end', () => {
asyncRunningCount--;
// remove the 1 that was set on start. No more new tasks will be added
if (asyncRunningCount == 0) resolve(); // no more tasks running
p2.then(() => { all done })
如果 运行ning 任务的计数暂时下降到 0,则启动时的值 1 会阻止 p2 被解析。
在on(end)中,1被递减。如果所有任务都完成,asyncRunningCount 将为 0。这可以通过 on(end) 中的递减或 on(data) 中的递减来实现。
p2.then,将在所有任务完成后运行。
所有其他承诺将在完成后释放。 事实上,在 on(data) 中你不需要承诺。只需启动您的异步任务,当异步任务完成时递减 asyncRunningCount,并检查 0。
这仍然意味着如果数据进来的速度非常快,那么很多承诺会同时开始。 但是如果你不启动promises,那么你需要存储传入的数据,所以无论哪种方式都会使用内存。