highlandjs中的循环数据流
Circular data flow in highlandjs
受到NoFlo.js的启发,我正在学习highland.js。我希望能够让流递归运行。在这个人为设计的示例中,我将提供一个乘以 2 的数字,然后我们过滤结果 <= 512。一旦数字相乘,它就会反馈到系统中。我的代码有效,但如果我在管道中取出 doto 函数,它不会处理任何数字。我怀疑我错误地将数据发送回 returnPipe。有没有更好的方法将数据通过管道传回系统?我错过了什么?
###
input>--m--->multiplyBy2>---+
| |
| |
+---<returnPipe<----+
###
H = require('highland')
input = H([1])
returnPipe = H.pipeline(
H.doto((v)->console.log(v))
)
H.merge([input,returnPipe])
.map((v)-> return v * 2)
.filter((v)-> return v <= 512)
.pipe(returnPipe)
来自文档:doto
在 re-emitting the source stream 时分拆流。这意味着就管道而言,有一个函数仍在通过它传递流。如果您取出 doto
,则原始流不会在下一次迭代时通过 return 流返回。
如果您要使用管道,则必须向它传递一个接受流并发出流的方法。例如,您可以在对 H.pipeline
的调用中将 doto
方法替换为类似 H.map((v)=>{console.log(v); return v;})
的方法,因为该方法使用流 并发出流 ,当流在 .pipe(returnPipe)
传回时,它将继续流动
编辑:为了回答您的问题,当您声明 let input = H([1])
时,您实际上是在创建流。您可以删除对管道和 returnPipe 的任何引用,并使用以下代码生成相同的输出:
let input = H([1]);
input.map((v)=> {
return v * 2;
})
.filter((v)=> {
if (v <= 512) {
console.log(v);
}
return v <= 512;
})
.pipe(input);
我的初衷是在highland.js中写一个递归文件reader。我 posted 加入了 highland.js github 问题列表,Victor Vu 帮助我将其整理成一篇精彩的文章。
H = require('highland')
fs = require('fs')
fsPath = require('path')
###
directory >---m----------> dirFilesStream >-------------f----> out
| |
| |
+-------------< returnPipe <--------------+
legend: (m)erge (f)ork
+ directory has the initial file
+ dirListStream does a directory listing
+ out prints out the full path of the file
+ directoryFilter runs stat and filters on directories
+ returnPipe the only way i can
###
directory = H(['someDirectory'])
mergePoint = H()
dirFilesStream = mergePoint.merge().flatMap((parentPath) ->
H.wrapCallback(fs.readdir)(parentPath).sequence().map (path) ->
fsPath.join parentPath, path
)
out = dirFilesStream
# Create the return pipe without using pipe!
returnPipe = dirFilesStream.observe().flatFilter((path) ->
H.wrapCallback(fs.stat)(path).map (v) ->
v.isDirectory()
)
# Connect up the merge point now that we have all of our streams.
mergePoint.write directory
mergePoint.write returnPipe
mergePoint.end()
# Release backpressure.
out.each H.log
受到NoFlo.js的启发,我正在学习highland.js。我希望能够让流递归运行。在这个人为设计的示例中,我将提供一个乘以 2 的数字,然后我们过滤结果 <= 512。一旦数字相乘,它就会反馈到系统中。我的代码有效,但如果我在管道中取出 doto 函数,它不会处理任何数字。我怀疑我错误地将数据发送回 returnPipe。有没有更好的方法将数据通过管道传回系统?我错过了什么?
###
input>--m--->multiplyBy2>---+
| |
| |
+---<returnPipe<----+
###
H = require('highland')
input = H([1])
returnPipe = H.pipeline(
H.doto((v)->console.log(v))
)
H.merge([input,returnPipe])
.map((v)-> return v * 2)
.filter((v)-> return v <= 512)
.pipe(returnPipe)
来自文档:doto
在 re-emitting the source stream 时分拆流。这意味着就管道而言,有一个函数仍在通过它传递流。如果您取出 doto
,则原始流不会在下一次迭代时通过 return 流返回。
如果您要使用管道,则必须向它传递一个接受流并发出流的方法。例如,您可以在对 H.pipeline
的调用中将 doto
方法替换为类似 H.map((v)=>{console.log(v); return v;})
的方法,因为该方法使用流 并发出流 ,当流在 .pipe(returnPipe)
编辑:为了回答您的问题,当您声明 let input = H([1])
时,您实际上是在创建流。您可以删除对管道和 returnPipe 的任何引用,并使用以下代码生成相同的输出:
let input = H([1]);
input.map((v)=> {
return v * 2;
})
.filter((v)=> {
if (v <= 512) {
console.log(v);
}
return v <= 512;
})
.pipe(input);
我的初衷是在highland.js中写一个递归文件reader。我 posted 加入了 highland.js github 问题列表,Victor Vu 帮助我将其整理成一篇精彩的文章。
H = require('highland')
fs = require('fs')
fsPath = require('path')
###
directory >---m----------> dirFilesStream >-------------f----> out
| |
| |
+-------------< returnPipe <--------------+
legend: (m)erge (f)ork
+ directory has the initial file
+ dirListStream does a directory listing
+ out prints out the full path of the file
+ directoryFilter runs stat and filters on directories
+ returnPipe the only way i can
###
directory = H(['someDirectory'])
mergePoint = H()
dirFilesStream = mergePoint.merge().flatMap((parentPath) ->
H.wrapCallback(fs.readdir)(parentPath).sequence().map (path) ->
fsPath.join parentPath, path
)
out = dirFilesStream
# Create the return pipe without using pipe!
returnPipe = dirFilesStream.observe().flatFilter((path) ->
H.wrapCallback(fs.stat)(path).map (v) ->
v.isDirectory()
)
# Connect up the merge point now that we have all of our streams.
mergePoint.write directory
mergePoint.write returnPipe
mergePoint.end()
# Release backpressure.
out.each H.log