由 fs.createReadStream 删除的流不会写入数组,尽管包装在一个承诺中

Stream ceated by fs.createReadStream does not write into array although wrapped into a promise

我正在从 csv 文件中读取一个流,并希望将某些行推送到一个数组中。不幸的是,在写入数组完成之前,流被关闭了。我将在这里分享我的代码:

const parseFromCSVFile = (filename: string, start: number, end: number)=> {
return new Promise((resolve, reject) => {
    let jsonArray: any = []
    const readingStream = fs.createReadStream(`${filepath}/${filename}.csv`, {start, end})
        readingStream.on('error', (error) => {
            console.log("error", error);
        })
    
        .pipe(csv({ separator: ',' }))
        .on('data', (line) => {
            
            readingStream.pause()
            jsonArray.push(line)
            readingStream.resume()
                
        })
    
        .on('end', () => {
            resolve(jsonArray)
            
        })
        .on("error", () => {
            reject()
        })
    })
}

parseFromCSVFile("test", 0 , 10000).then((res: any) => {
    console.log(res);
    console.log(res.length);
    
    
})

变量 res 的长度应该是 10000,但不幸的是它的长度是 53。如果我进一步减少 end 函数 returns 一个空数组。如果有人可以帮助我解决这个问题或者为此推荐另一个库或方法,我将不胜感激!我为此搜索了很多 Whosebug 问答!

我用不同的方式解决了它。由于 startend 变量在这种情况下不引用文件中的行,而是引用我需要删除它们的字节,因为它们导致数据被剥离到 53 个对象或 10000 字节。我的最终解决方案如下所示:

export const parseFromCSVFile = (filename: string, start: number, end: number)=> {
return new Promise((resolve, reject) => {
    let jsonArray: any = []
    let count = 0
    const readingStream = fs.createReadStream(`${tempFilePath}/${filename}`)
        readingStream.on('error', (error) => {
            reject({message: error.message})
        })
        .pipe(csv({ separator: ',' }))
        .on('data', (line) => {
            count >= start && count <= end && jsonArray.push(line)
            if(count > end ){
                resolve(jsonArray)
                readingStream.close() 
        }
                
        count += 1
        })

        .on('end', () => {
            resolve(jsonArray)
            readingStream.close()
            
        })
        .on("error", (error) => {
            reject({message: error.message})
        })
    })
}