如何在nodejs中使用事件和管道创建函数

How to make a function with events and pipe in nodejs

我正在用打字稿写一个 nodejs 库,这个库的主要范围是从给定的 url 下载东西,我希望它能像这样使用

import library from 'library'

library('https://www.example.com')
    .on('progress', (progress: progress) => {
        //do something with the progress
    })
    .on('end', () => {
        //do something when done
    })
    .pipe(fs.createWriteStream('./test/file.mp4'))

我从来没有以这种方式处理过节点事件和流,所以我什至不知道该怎么做我正在使用 typescript 和 webpack 也请原谅糟糕的英语

您将需要实施 Readable stream. Node streams are instances of EventEmitter,这样您就可以自动访问事件 API。

至少,您需要实现一个 _read() 方法,只要消费者准备好从队列中接收更多数据,就会在内部调用该方法。由于您希望库报告进度,因此您还需要跟踪已处理的数据量并相应地发出事件。

下面的代码忽略了一些重要的事情,例如 backpressuring, but it's a start. I'm using node-fetch 作为请求库,因为它公开了一个底层响应流并且相当容易使用。

// fileLoader.js
const {Readable} = require('stream')
const fetch = require('node-fetch')

class FileLoader extends Readable {
  constructor(url) {
    super()
    this._url = url
    this._fetchStarted = false
    this._totalLength = 0
    this._currentLength = 0
  }

  _processData(stream) {
    stream
      .on('end', () => {
        this.push(null)
      })
      .on('error', (err) => {
        this.destroy(err)
      })
      .on('data', (chunk) => {
        this._currentLength += chunk.length
        if (this._totalLength) {
          this.emit('progress', Math.round(this._currentLength / this._totalLength * 100))
        }
        this.push(chunk)
      })
  }

  _startFetch() {
    fetch(this._url)
      .then((res) => {
        if (!res.ok) {
          return this.destroy(new Error(`fetch resulted in ${res.status}`))
        }
        this._totalLength = res.headers.get('content-length')
        this._processData(res.body)
      })
      .catch((err) => {
        return this.destroy(new Error(err))
      })
  }

  _read() {
    if (!this._fetchStarted) {
      this._fetchStarted = true
      this._startFetch()
    }
  }
}

module.exports.loadFile = (url) => new FileLoader(url)

消费者代码:

// consumer.js
const fs = require('fs')
const {loadFile} = require('./fileLoader')

loadFile('http://example.com/video.mp4')
  .on('progress', (progress) => {
    console.log(`${progress}%`)
  })
  .on('end', () => {
    console.log('done')
  })
  .on('error', (err) => {
    console.log(err)
  })
  .pipe(fs.createWriteStream('./tempy.mp4'))