如何在nodejs中使用事件和管道创建函数
How to make a function with events and pipe in nodejs
我正在用打字稿写一个 nodejs 库,这个库的主要范围是从给定的 url 下载东西,我希望它能像这样使用
import library from 'library'
library('https://www.example.com')
.on('progress', (progress: progress) => {
//do something with the progress
})
.on('end', () => {
//do something when done
})
.pipe(fs.createWriteStream('./test/file.mp4'))
我从来没有以这种方式处理过节点事件和流,所以我什至不知道该怎么做我正在使用 typescript 和 webpack 也请原谅糟糕的英语
您将需要实施 Readable stream. Node streams are instances of EventEmitter,这样您就可以自动访问事件 API。
至少,您需要实现一个 _read()
方法,只要消费者准备好从队列中接收更多数据,就会在内部调用该方法。由于您希望库报告进度,因此您还需要跟踪已处理的数据量并相应地发出事件。
下面的代码忽略了一些重要的事情,例如 backpressuring, but it's a start. I'm using node-fetch 作为请求库,因为它公开了一个底层响应流并且相当容易使用。
// fileLoader.js
const {Readable} = require('stream')
const fetch = require('node-fetch')
class FileLoader extends Readable {
constructor(url) {
super()
this._url = url
this._fetchStarted = false
this._totalLength = 0
this._currentLength = 0
}
_processData(stream) {
stream
.on('end', () => {
this.push(null)
})
.on('error', (err) => {
this.destroy(err)
})
.on('data', (chunk) => {
this._currentLength += chunk.length
if (this._totalLength) {
this.emit('progress', Math.round(this._currentLength / this._totalLength * 100))
}
this.push(chunk)
})
}
_startFetch() {
fetch(this._url)
.then((res) => {
if (!res.ok) {
return this.destroy(new Error(`fetch resulted in ${res.status}`))
}
this._totalLength = res.headers.get('content-length')
this._processData(res.body)
})
.catch((err) => {
return this.destroy(new Error(err))
})
}
_read() {
if (!this._fetchStarted) {
this._fetchStarted = true
this._startFetch()
}
}
}
module.exports.loadFile = (url) => new FileLoader(url)
消费者代码:
// consumer.js
const fs = require('fs')
const {loadFile} = require('./fileLoader')
loadFile('http://example.com/video.mp4')
.on('progress', (progress) => {
console.log(`${progress}%`)
})
.on('end', () => {
console.log('done')
})
.on('error', (err) => {
console.log(err)
})
.pipe(fs.createWriteStream('./tempy.mp4'))
我正在用打字稿写一个 nodejs 库,这个库的主要范围是从给定的 url 下载东西,我希望它能像这样使用
import library from 'library'
library('https://www.example.com')
.on('progress', (progress: progress) => {
//do something with the progress
})
.on('end', () => {
//do something when done
})
.pipe(fs.createWriteStream('./test/file.mp4'))
我从来没有以这种方式处理过节点事件和流,所以我什至不知道该怎么做我正在使用 typescript 和 webpack 也请原谅糟糕的英语
您将需要实施 Readable stream. Node streams are instances of EventEmitter,这样您就可以自动访问事件 API。
至少,您需要实现一个 _read()
方法,只要消费者准备好从队列中接收更多数据,就会在内部调用该方法。由于您希望库报告进度,因此您还需要跟踪已处理的数据量并相应地发出事件。
下面的代码忽略了一些重要的事情,例如 backpressuring, but it's a start. I'm using node-fetch 作为请求库,因为它公开了一个底层响应流并且相当容易使用。
// fileLoader.js
const {Readable} = require('stream')
const fetch = require('node-fetch')
class FileLoader extends Readable {
constructor(url) {
super()
this._url = url
this._fetchStarted = false
this._totalLength = 0
this._currentLength = 0
}
_processData(stream) {
stream
.on('end', () => {
this.push(null)
})
.on('error', (err) => {
this.destroy(err)
})
.on('data', (chunk) => {
this._currentLength += chunk.length
if (this._totalLength) {
this.emit('progress', Math.round(this._currentLength / this._totalLength * 100))
}
this.push(chunk)
})
}
_startFetch() {
fetch(this._url)
.then((res) => {
if (!res.ok) {
return this.destroy(new Error(`fetch resulted in ${res.status}`))
}
this._totalLength = res.headers.get('content-length')
this._processData(res.body)
})
.catch((err) => {
return this.destroy(new Error(err))
})
}
_read() {
if (!this._fetchStarted) {
this._fetchStarted = true
this._startFetch()
}
}
}
module.exports.loadFile = (url) => new FileLoader(url)
消费者代码:
// consumer.js
const fs = require('fs')
const {loadFile} = require('./fileLoader')
loadFile('http://example.com/video.mp4')
.on('progress', (progress) => {
console.log(`${progress}%`)
})
.on('end', () => {
console.log('done')
})
.on('error', (err) => {
console.log(err)
})
.pipe(fs.createWriteStream('./tempy.mp4'))