highlandjs中的循环数据流

Circular data flow in highlandjs

受到NoFlo.js的启发,我正在学习highland.js。我希望能够让流递归运行。在这个人为设计的示例中,我将提供一个乘以 2 的数字,然后我们过滤结果 <= 512。一旦数字相乘,它就会反馈到系统中。我的代码有效,但如果我在管道中取出 doto 函数,它不会处理任何数字。我怀疑我错误地将数据发送回 returnPipe。有没有更好的方法将数据通过管道传回系统?我错过了什么?

###
  input>--m--->multiplyBy2>---+
          |                   |
          |                   |
          +---<returnPipe<----+
###

H = require('highland')

input = H([1])
returnPipe = H.pipeline(
  H.doto((v)->console.log(v))
)
H.merge([input,returnPipe])
 .map((v)-> return v * 2)
 .filter((v)-> return v <= 512)
 .pipe(returnPipe)

来自文档:dotore-emitting the source stream 时分拆流。这意味着就管道而言,有一个函数仍在通过它传递流。如果您取出 doto,则原始流不会在下一次迭代时通过 return 流返回。

如果您要使用管道,则必须向它传递一个接受流并发出流的方法。例如,您可以在对 H.pipeline 的调用中将 doto 方法替换为类似 H.map((v)=>{console.log(v); return v;}) 的方法,因为该方法使用流 并发出流 ,当流在 .pipe(returnPipe)

传回时,它将继续流动

编辑:为了回答您的问题,当您声明 let input = H([1]) 时,您实际上是在创建流。您可以删除对管道和 returnPipe 的任何引用,并使用以下代码生成相同的输出:

let input = H([1]);

input.map((v)=> {
  return v * 2;
})
.filter((v)=> {
  if (v <= 512) {
    console.log(v);
  }
  return v <= 512;
})
.pipe(input);

我的初衷是在highland.js中写一个递归文件reader。我 posted 加入了 highland.js github 问题列表,Victor Vu 帮助我将其整理成一篇精彩的文章。

H = require('highland')
fs = require('fs')
fsPath = require('path')

###
  directory >---m----------> dirFilesStream >-------------f----> out
                |                                         |
                |                                         |
                +-------------< returnPipe <--------------+

  legend: (m)erge  (f)ork

 + directory         has the initial file
 + dirListStream     does a directory listing
 + out               prints out the full path of the file
 + directoryFilter   runs stat and filters on directories
 + returnPipe        the only way i can

###

directory = H(['someDirectory'])
mergePoint = H()
dirFilesStream = mergePoint.merge().flatMap((parentPath) ->
  H.wrapCallback(fs.readdir)(parentPath).sequence().map (path) ->
    fsPath.join parentPath, path
)
out = dirFilesStream
# Create the return pipe without using pipe!
returnPipe = dirFilesStream.observe().flatFilter((path) ->
  H.wrapCallback(fs.stat)(path).map (v) ->
    v.isDirectory()
)
# Connect up the merge point now that we have all of our streams.
mergePoint.write directory
mergePoint.write returnPipe
mergePoint.end()
# Release backpressure.
out.each H.log