解析大型 CSV 和流式传输承诺行
Parsing large CSV and streaming rows of promises
在尝试流式传输 csv、为每一行发出 http 请求以及让所有内容执行并以“正确”顺序记录到控制台时有点混乱。最后,我认为我没有兑现我的承诺,或者...?
const getUserByEmail = async (email) => {
const encodedEmail = encodeURIComponent(email);
try {
const response = await http.get(`users?email=${encodedEmail}`);
const userId = response.data.data[0] && response.data.data[0].id;
return (userId ? userId : `${email} not found`);
} catch (error) {
console.error('get user error: ', error);
}
};
const run = async () => {
console.log('==> Reading csv ...');
const promises = [];
const readStream = fs.createReadStream('import-test.csv')
.pipe(csv.parse({ headers: true }))
.on('error', (error) => console.error('stream error: ', error))
.on('data', (row) => {
promises.push(getUserByEmail(row.email));
})
.on('end', rowCount => {
console.log(`==> Parsed ${rowCount} rows from csv ...`);
})
await Promise.all(promises)
.then(values => console.log(values))
console.log('==> End of script')
};
run();
我正在尝试/期望上面的代码获取 csv 的每一行,将每个 http 调用(一个承诺)推送到一个承诺数组,并让所有内容 execute/log 按照我的顺序进行控制台期待中。
这是我的实际输出:
==> Reading csv...
[]
==> End of script
==> Parsed 10 rows from csv ...
这就是我所期待的:
==> Reading csv...
==> Parsed 10 rows from csv ...
[
QyDPkn3WZp,
e75KzrqYxK,
iqDXoEFMZy,
PstouMRz3y,
w188hLyeT6,
g18oxMOy6l,
8wjVJutFnh,
fakeEmail@fakeDomain.com not found,
QEHaG3cp7d,
y8I4oX6aCe
]
==> End of script
对我来说最大的问题是在“==> 脚本结束”之后有任何日志记录,这表明我对 when/why 所有以前的事件都在登录没有很好的把握他们的顺序。
最终——我还没有做到——我还想 buffer/time 这些请求每分钟 100 个,否则我将受到这个特定 API 的速率限制。
谢谢!
一直到 await Promise.all(promises)
的空洞 readStream 是同步的 - data
事件是异步的,并在另一个事件循环中填充承诺
因此,当您调用 Promise.all 时,promises 是一个空数组 - 您不是在等待流结束。你可能想把你的逻辑放在结束事件中而不是像这样
const run = async () => {
console.log('==> Reading csv ...');
const promises = [];
const readStream = fs.createReadStream('import-test.csv')
.pipe(csv.parse({ headers: true }))
.on('error', (error) => console.error('stream error: ', error))
.on('data', (row) => {
promises.push(getUserByEmail(row.email));
})
.on('end', async rowCount => {
await Promise.all(promises)
.then(values => console.log(values))
console.log('==> End of script')
})
}
另一种更简单的方法是使用异步迭代器
readStream 有一个 symbol.asyncIterator
可以使用
const run = async () => {
console.log('==> Reading csv ...');
let rowCount = 0
const promises = []
const readStream = fs.createReadStream('import-test.csv')
.pipe(csv.parse({ headers: true }))
for await (let row of readStream) {
rowCount++
promises.push(getUserByEmail(row.email));
}
console.log(`==> Parsed ${rowCount} rows from csv ...`)
await Promise.all(promises).then(console.log)
console.log('==> End of script')
}
我会进一步限制并发并执行:
const run = async () => {
console.log('==> Reading csv ...');
const result = []
const readStream = fs.createReadStream('import-test.csv')
.pipe(csv.parse({ headers: true }))
for await (let row of readStream) {
result.push(await getUserByEmail(row.email))
}
console.log(result)
console.log('==> End of script')
}
如果您想增加异步迭代器的并发性,请查看 但要小心。使用此方法时结果可能不正常
在尝试流式传输 csv、为每一行发出 http 请求以及让所有内容执行并以“正确”顺序记录到控制台时有点混乱。最后,我认为我没有兑现我的承诺,或者...?
const getUserByEmail = async (email) => {
const encodedEmail = encodeURIComponent(email);
try {
const response = await http.get(`users?email=${encodedEmail}`);
const userId = response.data.data[0] && response.data.data[0].id;
return (userId ? userId : `${email} not found`);
} catch (error) {
console.error('get user error: ', error);
}
};
const run = async () => {
console.log('==> Reading csv ...');
const promises = [];
const readStream = fs.createReadStream('import-test.csv')
.pipe(csv.parse({ headers: true }))
.on('error', (error) => console.error('stream error: ', error))
.on('data', (row) => {
promises.push(getUserByEmail(row.email));
})
.on('end', rowCount => {
console.log(`==> Parsed ${rowCount} rows from csv ...`);
})
await Promise.all(promises)
.then(values => console.log(values))
console.log('==> End of script')
};
run();
我正在尝试/期望上面的代码获取 csv 的每一行,将每个 http 调用(一个承诺)推送到一个承诺数组,并让所有内容 execute/log 按照我的顺序进行控制台期待中。
这是我的实际输出:
==> Reading csv...
[]
==> End of script
==> Parsed 10 rows from csv ...
这就是我所期待的:
==> Reading csv...
==> Parsed 10 rows from csv ...
[
QyDPkn3WZp,
e75KzrqYxK,
iqDXoEFMZy,
PstouMRz3y,
w188hLyeT6,
g18oxMOy6l,
8wjVJutFnh,
fakeEmail@fakeDomain.com not found,
QEHaG3cp7d,
y8I4oX6aCe
]
==> End of script
对我来说最大的问题是在“==> 脚本结束”之后有任何日志记录,这表明我对 when/why 所有以前的事件都在登录没有很好的把握他们的顺序。
最终——我还没有做到——我还想 buffer/time 这些请求每分钟 100 个,否则我将受到这个特定 API 的速率限制。
谢谢!
一直到 await Promise.all(promises)
的空洞 readStream 是同步的 - data
事件是异步的,并在另一个事件循环中填充承诺
因此,当您调用 Promise.all 时,promises 是一个空数组 - 您不是在等待流结束。你可能想把你的逻辑放在结束事件中而不是像这样
const run = async () => {
console.log('==> Reading csv ...');
const promises = [];
const readStream = fs.createReadStream('import-test.csv')
.pipe(csv.parse({ headers: true }))
.on('error', (error) => console.error('stream error: ', error))
.on('data', (row) => {
promises.push(getUserByEmail(row.email));
})
.on('end', async rowCount => {
await Promise.all(promises)
.then(values => console.log(values))
console.log('==> End of script')
})
}
另一种更简单的方法是使用异步迭代器
readStream 有一个 symbol.asyncIterator
可以使用
const run = async () => {
console.log('==> Reading csv ...');
let rowCount = 0
const promises = []
const readStream = fs.createReadStream('import-test.csv')
.pipe(csv.parse({ headers: true }))
for await (let row of readStream) {
rowCount++
promises.push(getUserByEmail(row.email));
}
console.log(`==> Parsed ${rowCount} rows from csv ...`)
await Promise.all(promises).then(console.log)
console.log('==> End of script')
}
我会进一步限制并发并执行:
const run = async () => {
console.log('==> Reading csv ...');
const result = []
const readStream = fs.createReadStream('import-test.csv')
.pipe(csv.parse({ headers: true }))
for await (let row of readStream) {
result.push(await getUserByEmail(row.email))
}
console.log(result)
console.log('==> End of script')
}
如果您想增加异步迭代器的并发性,请查看