正确使用 fs write inside createReadStream on data

Correctly use fs write inside createReadStream on data

我正在尝试使用流将 n 二进制文件组合成 javascript 中的单个文件。我有一个传递给以下函数的写入流。我注意到写入的总字节数与文件中的实际字节数不匹配,并且在多次运行中也不一致。

阅读文档后,我注意到 write 调用 returns 一个 promise,并且在之前的 promise 完成之前再次调用是不安全的。我不确定如何让 readStream.on('data', function (chunk) 使用 await,因为函数不是异步的,我得到一个错误 await is only valid in async function

async function concatFile (filename, fileHandle) {
  return new Promise((resolve, reject) => {
    const readStream = fs.createReadStream(filename, { highWaterMark: 1024 })

    readStream.on('data', function (chunk) {
      // read
      fileHandle.write(chunk)
    })
    readStream.on('error', e => {
      reject(e)
    })
    readStream.on('close', function (err) {
      // close
    })
    readStream.on('end', function () {
      // done
      readStream.close()
      resolve()
    })
  }) // end of Promise
}

我在以下代码段中使用上述函数:

  const fileWriter = fs.createWriteStream('concatBins.bin', { flags: 'w' })
  let writtenLen = 0
  fileList = {}
  fileList[0] = "foo.bin"
  fileList[1] = "bar.bin"
  for (const [key, value] of Object.entries(fileList)) {
    await concatFile(value, fileWriter)
    writtenLen = fileWriter.bytesWritten
    console.log('bytes written ' + writtenLen)
  }

您可以 pause the readStream until the write is done to avoid getting future data events and the resume 完成写入后。并且,如果您想使用 await,您可以将 .on('data', ...) 回调声明为 async。但是,您必须暂停 readStream,因为 async/await 不会为您暂停它。

// stream write that returns a promise when OK to proceed
// with more writes
function write(stream, data) {
    return new Promise((resolve, reject) => {
         if (stream.write(data)) {
             resolve();
         } else {
             // need to wait for drain event
             stream.once('drain', resolve);
         }
         
    });
}

async function concatFile (filename, writeStream) {
  return new Promise((resolve, reject) => {
    const readStream = fs.createReadStream(filename, { highWaterMark: 1024 });
    let paused = false;
    let ended = false;

    readStream.on('data', async function(chunk) {
      // read
      try {
        readStream.pause();
        paused = true;
        await write(writeStream, chunk);
      } catch(e) {
        // have to decide what you're doing if you get a write  error here
        reject(e);
      } finally {
        paused = false;
        readStream.resume();
        if (ended) {
            readStream.emit("finalEnd");
        }
      }
    });
    readStream.on('error', e => {
      reject(e)
    })
    readStream.on('close', function (err) {
      // close
    })
    readStream.on('end', function () {
      // done
      ended = true;
      if (!paused) {
         readStream.emit('finalEnd');
      }
    });
    // listen for our real end event
    readStream.on('finalEnd', () {
       readStream.close();
       resolve()
    });
  }) // end of Promise
}