child_process 流背压
child_process stream backpressure
我正在使用 exec-stream
with Node.js and piping that stream through a few other transform streams, eventually through a node-brake
流来限制数据速率。 braking stream好像没有效果,实际上数据最终在长链的末端丢失了。
execStream('some-external-binary').pipe(transform1).pipe(transform2).pipe(brake(1024))
我认为正在发生的事情是 child_process
STDOUT
流(在 exec-stream
内)没有暂停,因此缓冲区会填满直到数据丢失。
child_process
流会这样吗?有什么方法可以让背压与 child_process
流一起正常工作?
我对 exec-stream 和 node-brake 不够熟悉,无法理解数据丢失的所有途径。
但是,我做了一个小实验,看看 node-brake 是否有背压效应,你提到这可能是数据丢失的潜在区域。
文件也托管在 Gist。
###
Created for
Please pardon the CoffeeScript, but I couldn't stand to extend stream.Transform in native JavaScript.
###
fs = require("fs")
execStream = require("exec-stream")
brake = require("brake")
file = fs.createWriteStream("tmp.txt")
class Double extends require("stream").Transform
_transform: (chunk, enc, cb) ->
@_last ?= Date.now()
@_called ?= []
@_called.push Date.now() - @_last
@_last = Date.now()
@push chunk.toString() + chunk.toString()
cb()
class UpperCase extends require("stream").Transform
_transform: (chunk, enc, cb) ->
@push chunk.toString().toUpperCase()
cb()
sum = (nums) ->
o = 0
o += i for i in nums
o
doTest = (size) ->
transform1 = new Double()
transform2 = new UpperCase()
transform3 = new Double()
execStream("dd", ["if=/dev/urandom", "bs=1024", "count=1"])
.pipe(transform1)
.pipe(transform2)
.pipe(brake(size))
.pipe(transform3)
.pipe(file)
file.on "finish", ->
fs.stat "tmp.txt", (err, stats) ->
throw err if err
called1 = transform1._called
averagePreBrake = sum(called1) / called1.length
called2 = transform3._called
averagePostBrake = sum(called2) / called2.length
console.log """
Generated with brake(#{size}): #{stats.size}
Average time between transformations pre-brake: #{averagePreBrake}ms
Average time between transformations post-brake: #{averagePostBrake}ms
"""
doTest 1024
doTest 256
处理结果如下。
我注意到刹车前的转换之间没有间隙。然而,刹车会扰乱后续的转型。鉴于此数据,我怀疑 node-brake 没有背压效应。
[我的 sh2png 实用程序生成的控制台输出的屏幕截图]
我正在使用 exec-stream
with Node.js and piping that stream through a few other transform streams, eventually through a node-brake
流来限制数据速率。 braking stream好像没有效果,实际上数据最终在长链的末端丢失了。
execStream('some-external-binary').pipe(transform1).pipe(transform2).pipe(brake(1024))
我认为正在发生的事情是 child_process
STDOUT
流(在 exec-stream
内)没有暂停,因此缓冲区会填满直到数据丢失。
child_process
流会这样吗?有什么方法可以让背压与 child_process
流一起正常工作?
我对 exec-stream 和 node-brake 不够熟悉,无法理解数据丢失的所有途径。
但是,我做了一个小实验,看看 node-brake 是否有背压效应,你提到这可能是数据丢失的潜在区域。
文件也托管在 Gist。
###
Created for
Please pardon the CoffeeScript, but I couldn't stand to extend stream.Transform in native JavaScript.
###
fs = require("fs")
execStream = require("exec-stream")
brake = require("brake")
file = fs.createWriteStream("tmp.txt")
class Double extends require("stream").Transform
_transform: (chunk, enc, cb) ->
@_last ?= Date.now()
@_called ?= []
@_called.push Date.now() - @_last
@_last = Date.now()
@push chunk.toString() + chunk.toString()
cb()
class UpperCase extends require("stream").Transform
_transform: (chunk, enc, cb) ->
@push chunk.toString().toUpperCase()
cb()
sum = (nums) ->
o = 0
o += i for i in nums
o
doTest = (size) ->
transform1 = new Double()
transform2 = new UpperCase()
transform3 = new Double()
execStream("dd", ["if=/dev/urandom", "bs=1024", "count=1"])
.pipe(transform1)
.pipe(transform2)
.pipe(brake(size))
.pipe(transform3)
.pipe(file)
file.on "finish", ->
fs.stat "tmp.txt", (err, stats) ->
throw err if err
called1 = transform1._called
averagePreBrake = sum(called1) / called1.length
called2 = transform3._called
averagePostBrake = sum(called2) / called2.length
console.log """
Generated with brake(#{size}): #{stats.size}
Average time between transformations pre-brake: #{averagePreBrake}ms
Average time between transformations post-brake: #{averagePostBrake}ms
"""
doTest 1024
doTest 256
处理结果如下。
我注意到刹车前的转换之间没有间隙。然而,刹车会扰乱后续的转型。鉴于此数据,我怀疑 node-brake 没有背压效应。
[我的 sh2png 实用程序生成的控制台输出的屏幕截图]