nodejs 异步等待在 createReadStream 中
nodejs async await inside createReadStream
我正在逐行读取 CSV 文件,inserting/updating 在 MongoDB 中读取。预期的输出将是
1. console.log(行);
2. console.log(光标);
3.console.log("stream");
但是输出像
1. console.log(行);
console.log(行); console.log(行); console.log(行); console.log(行); ………………
2. console.log(光标);
3.console.log("stream");
请让我知道我在这里缺少什么。
const csv = require('csv-parser');
const fs = require('fs');
var mongodb = require("mongodb");
var client = mongodb.MongoClient;
var url = "mongodb://localhost:27017/";
var collection;
client.connect(url,{ useUnifiedTopology: true }, function (err, client) {
var db = client.db("UKCompanies");
collection = db.collection("company");
startRead();
});
var cursor={};
async function insertRec(row){
console.log(row);
cursor = await collection.update({CompanyNumber:23}, row, {upsert: true});
if(cursor){
console.log(cursor);
}else{
console.log('not exist')
}
console.log("stream");
}
async function startRead() {
fs.createReadStream('./data/inside/6.csv')
.pipe(csv())
.on('data', async (row) => {
await insertRec(row);
})
.on('end', () => {
console.log('CSV file successfully processed');
});
}
在这种情况下这是预期的行为,因为您的 on
数据侦听器会在数据流中可用时异步触发 insertRec
。所以这就是为什么你的第一行插入方法被并行执行的原因。如果您想控制此行为,您可以在创建读取流时使用 highWaterMark
(https://nodejs.org/api/stream.html#stream_readable_readablehighwatermark) 属性。这样您将一次获得 1 条记录,但我不确定您的用例是什么。
像这样
fs.createReadStream(`somefile.csv`, {
"highWaterMark": 1
})
此外,您没有等待 startRead
方法。我会将它包装在 promise 中并在 end
侦听器中解决它,否则你将不知道处理何时完成。像
function startRead() {
return new Promise((resolve, reject) => {
fs.createReadStream(`somepath`)
.pipe(csv())
.on("data", async row => {
await insertRec(row);
})
.on("error", err => {
reject(err);
})
.on("end", () => {
console.log("CSV file successfully processed");
resolve();
});
});
}
在您的 startRead()
函数中,await insertRec()
不会在 insertRec()
处理时阻止更多 data
事件的流动。因此,如果您不希望下一个 data
事件到 运行,直到 insertRec()
完成,您需要暂停,然后恢复流。
async function startRead() {
const stream = fs.createReadStream('./data/inside/6.csv')
.pipe(csv())
.on('data', async (row) => {
try {
stream.pause();
await insertRec(row);
} finally {
stream.resume();
}
})
.on('end', () => {
console.log('CSV file successfully processed');
});
}
仅供参考,如果 insertRec()
失败,您还需要一些错误处理。
我正在逐行读取 CSV 文件,inserting/updating 在 MongoDB 中读取。预期的输出将是 1. console.log(行); 2. console.log(光标); 3.console.log("stream");
但是输出像 1. console.log(行); console.log(行); console.log(行); console.log(行); console.log(行); ……………… 2. console.log(光标); 3.console.log("stream"); 请让我知道我在这里缺少什么。
const csv = require('csv-parser');
const fs = require('fs');
var mongodb = require("mongodb");
var client = mongodb.MongoClient;
var url = "mongodb://localhost:27017/";
var collection;
client.connect(url,{ useUnifiedTopology: true }, function (err, client) {
var db = client.db("UKCompanies");
collection = db.collection("company");
startRead();
});
var cursor={};
async function insertRec(row){
console.log(row);
cursor = await collection.update({CompanyNumber:23}, row, {upsert: true});
if(cursor){
console.log(cursor);
}else{
console.log('not exist')
}
console.log("stream");
}
async function startRead() {
fs.createReadStream('./data/inside/6.csv')
.pipe(csv())
.on('data', async (row) => {
await insertRec(row);
})
.on('end', () => {
console.log('CSV file successfully processed');
});
}
在这种情况下这是预期的行为,因为您的 on
数据侦听器会在数据流中可用时异步触发 insertRec
。所以这就是为什么你的第一行插入方法被并行执行的原因。如果您想控制此行为,您可以在创建读取流时使用 highWaterMark
(https://nodejs.org/api/stream.html#stream_readable_readablehighwatermark) 属性。这样您将一次获得 1 条记录,但我不确定您的用例是什么。
像这样
fs.createReadStream(`somefile.csv`, {
"highWaterMark": 1
})
此外,您没有等待 startRead
方法。我会将它包装在 promise 中并在 end
侦听器中解决它,否则你将不知道处理何时完成。像
function startRead() {
return new Promise((resolve, reject) => {
fs.createReadStream(`somepath`)
.pipe(csv())
.on("data", async row => {
await insertRec(row);
})
.on("error", err => {
reject(err);
})
.on("end", () => {
console.log("CSV file successfully processed");
resolve();
});
});
}
在您的 startRead()
函数中,await insertRec()
不会在 insertRec()
处理时阻止更多 data
事件的流动。因此,如果您不希望下一个 data
事件到 运行,直到 insertRec()
完成,您需要暂停,然后恢复流。
async function startRead() {
const stream = fs.createReadStream('./data/inside/6.csv')
.pipe(csv())
.on('data', async (row) => {
try {
stream.pause();
await insertRec(row);
} finally {
stream.resume();
}
})
.on('end', () => {
console.log('CSV file successfully processed');
});
}
仅供参考,如果 insertRec()
失败,您还需要一些错误处理。