如何延迟流读取调用
How to defer stream read invocation
总的来说,我仍在努力通过 streams
。我已经能够使用 multiparty 从 form.on('part')
中流式传输一个大文件。但我需要推迟调用并在读取之前解析流。我试过 PassThrough
、through
。 through2
,但得到了不同的结果,主要是挂起,我不知道该怎么做,也不知道调试步骤。我对所有选择持开放态度。感谢所有见解。
import multiparty from 'multiparty'
import {
PassThrough
} from 'stream';
import through from 'through'
import through2 from 'through2'
export function promisedMultiparty(req) {
return new Promise((resolve, reject) => {
const form = new multiparty.Form()
const form_files = []
let q_str = ''
form.on('field', (fieldname, value) => {
if (value) q_str = appendQStr(fieldname, value, q_str)
})
form.on('part', async (part) => {
if (part.filename) {
const pass1 = new PassThrough() // this hangs at 10%
const pass2 = through(function write(data) { // this hangs from the beginning
this.queue(data)
},
function end() {
this.queue(null)
})
const pass3 = through2() // this hangs at 10%
/*
// This way works for large files, but I want to defer
// invocation
const form_data = new FormData()
form_data.append(savepath, part, {
filename,
})
const r = request.post(url, {
headers: {
'transfer-encoding': 'chunked'
}
}, responseCallback(resolve))
r._form = form
*/
form_files.push({
part: part.pipe(pass1),
// part: part.pipe(pass2),
// part: part.pipe(pass3),
})
} else {
part.resume()
}
})
form.on('close', () => {
resolve({
fields: qs.parse(q_str),
forms: form_files,
})
})
form.parse(req)
})
}
p.s。如果有人可以使用正确的术语,请肯定标题会更好。谢谢。
我认为这是因为您没有正确使用 through2
- 即在缓冲区已满后实际上并没有清空缓冲区(这就是为什么它在较大的文件上挂起 10%,但在较小的文件上有效)。
我相信像这样的实现应该做到这一点:
const pass2 = through2(function(chunk, encoding, next) {
// do something with the data
// Use this only if you want to send the data further to another stream reader
// Note - From your implementation you don't seem to need it
// this.push(data)
// This is what tells through2 it's ready to empty the
// buffer and read more data
next();
})
总的来说,我仍在努力通过 streams
。我已经能够使用 multiparty 从 form.on('part')
中流式传输一个大文件。但我需要推迟调用并在读取之前解析流。我试过 PassThrough
、through
。 through2
,但得到了不同的结果,主要是挂起,我不知道该怎么做,也不知道调试步骤。我对所有选择持开放态度。感谢所有见解。
import multiparty from 'multiparty'
import {
PassThrough
} from 'stream';
import through from 'through'
import through2 from 'through2'
export function promisedMultiparty(req) {
return new Promise((resolve, reject) => {
const form = new multiparty.Form()
const form_files = []
let q_str = ''
form.on('field', (fieldname, value) => {
if (value) q_str = appendQStr(fieldname, value, q_str)
})
form.on('part', async (part) => {
if (part.filename) {
const pass1 = new PassThrough() // this hangs at 10%
const pass2 = through(function write(data) { // this hangs from the beginning
this.queue(data)
},
function end() {
this.queue(null)
})
const pass3 = through2() // this hangs at 10%
/*
// This way works for large files, but I want to defer
// invocation
const form_data = new FormData()
form_data.append(savepath, part, {
filename,
})
const r = request.post(url, {
headers: {
'transfer-encoding': 'chunked'
}
}, responseCallback(resolve))
r._form = form
*/
form_files.push({
part: part.pipe(pass1),
// part: part.pipe(pass2),
// part: part.pipe(pass3),
})
} else {
part.resume()
}
})
form.on('close', () => {
resolve({
fields: qs.parse(q_str),
forms: form_files,
})
})
form.parse(req)
})
}
p.s。如果有人可以使用正确的术语,请肯定标题会更好。谢谢。
我认为这是因为您没有正确使用 through2
- 即在缓冲区已满后实际上并没有清空缓冲区(这就是为什么它在较大的文件上挂起 10%,但在较小的文件上有效)。
我相信像这样的实现应该做到这一点:
const pass2 = through2(function(chunk, encoding, next) {
// do something with the data
// Use this only if you want to send the data further to another stream reader
// Note - From your implementation you don't seem to need it
// this.push(data)
// This is what tells through2 it's ready to empty the
// buffer and read more data
next();
})