获得关于流管道的不完整数据以进行快速响应

Got incomplete data on stream piping to an express response

需要将 DB table 转换为 csv 报告。
如果我立即用一个查询卸载整个 tablet,那么应用程序就会崩溃,因为内存用完了。我决定从 table 的 100 行部分中查询数据,将每一行转换为报告的一行并将其写入流中,该流通过管道进行快速响应。

这一切几乎是这样发生的:

  1. 数据库查询

    const select100Users = (maxUserCreationDateStr) => {
       return db.query(`
          SELECT * FROM users WHERE created_at < to_timestamp(${maxUserCreationDateStr})
          ORDER BY created_at DESC LIMIT 100`);
    }
    
  2. 流初始化

    const { PassThrough } = require('stream');
    const getUserReportStream = () => {
       const stream = new PassThrough();
       writeUserReport(stream).catch((e) => stream.emit('error', e));
       return stream;
    };
    
  3. 通过快速响应管道传输流

    app.get('/report', (req, res) => {
       const stream = getUserReportStream();
       res.setHeader('Content-Type', 'application/vnd.ms-excel');
       res.setHeader(`Content-Disposition', 'attachment; filename="${ filename }"`);
    
       stream.pipe(res);
    });
    
  4. 最后是如何将数据写入流

    const writeUserReport(stream) => {
       let maxUserCreationDateGlobal = Math.trunc(Date.now() / 1000);
       let flag = true;
    
       stream.write(USER_REPORT_HEADER);
    
       while (flag) {
          const rows100 = await select100Users(maxUserCreationDateGlobal);
          console.log(rows100.length);
    
          if (rows100.length === 0) {
             flag = false;
          } else {
             let maxUserCreationDate = maxUserCreationDateGlobal;
    
             const users100 = await Promise.all(
                rows100.map((r) => {
                   const created_at = r.created_at;
                   const createdAt = new Date(created_at);
    
                   if (created_at && createdAt.toString() !== 'Invalid Date') {
                      const createdAtNumber = Math.trunc(createdAt.valueOf() / 1000);
                      maxUserCreationDate = Math.min(maxUserCreationDate, createdAtNumber);
                   }
    
                   return mapUser(r); // returns a promise
                })
             );
    
             users100.forEach((u) => stream.write(generateCsvRowFromUser(u)));
             maxUserCreationDateGlobal = maxUserCreationDate;
    
             if (rows100.length < 100) {
                flag = false;
                console.log('***');
             }
          }
       }
    
       console.log('end');
       stream.end();
    };
    

因此我在控制台中看到了这个输出:

100 // 100
100 // 200
100 // 300
100 // 400
100 // 500
87  // 587
***
end

但是在下载的文件中我得到了 401 行(第一行 USER_REPORT_HEADER)。感觉就像 stream.end() 在从流中读取所有值之前关闭流。

我尝试以类似的方式使用 rxjs 中的 BehaviorSubject 而不是 PassThrough - 结果是一样的..

我如何等待从流中读取我在那里写入的所有数据?
或者也许有人可以推荐一种替代方法来解决这个问题。

stream.write 希望您将回调作为第二个(或第三个参数)传递,以了解写入操作何时完成。除非之前的写操作完成,否则不能再次调用 write。

所以一般来说,我建议将整个函数设为异步,每次调用 stream.write 时都将其包装成 Promise,例如

await new Promise((resolve, reject) => stream.write(data, (error) => {
   if (error) {
      reject(error);
      return;
   }
   resolve();
});

显然,将其提取到某种方法中是有意义的。

编辑:此外,我认为这不是实际问题。我假设您的 http 连接只是在所有提取完成之前超时,因此服务器最终将在达到超时期限后关闭流。