使用来自 scalaz-stream 的 `halt` 时 `wye` 组合器的副作用
side-effects for `wye` combinators when using `halt` from scalaz-stream
filter
(在内部使用 halt
)终止其他分支,即使它有一些副作用:
scala> val p = Process("1","2", "3")
scala> val p1 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
scala> val p2 = p.filter(_ => false).map(_ + "p2").observe(io.stdOutLines)
scala> (p1 yip p2).run.run
1p1
scala> val p2 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
scala> (p1 yip p2).run.run
1p1
1p2
2p1
2p2
3p1
3p2
似乎合乎逻辑,因为在 filter
之后没有值返回给 yip
。但是用 observe
指定的副作用呢?
我目前的解决方案是使用flatMap
来指定默认值:
scala> val p1 = p.map(_ + "p1").flatMap(x => Process.emit(x).observe(io.stdOutLines))
scala> val p2 = p.map(_ + "p2").flatMap(x => Process.emit(""))
scala> (p1 yip p2).run.run
1p1
2p1
3p1
但也许有一种方法可以使用 filter
?
P.S。 merge
组合器对其他分支执行副作用(因为它不需要返回值),但如果一个分支停止它不会等待其他分支(即使它有副作用)。
为了 运行 即使在 p2 终止后的效果也需要明确 default
行为。所以大概有这些解决方案:
- 定义
p2
以在终止后提供默认值
- 如果我们真的不需要元组,请使用
either
星形向左和向右移动
也许 (1) 更接近问题,代码将如下所示:
val p = Process("1","2", "3")
val p1 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
val p2 = p.filter(_ => false).map(_ + "p2")
.observe(io.stdOutLines).map(Some(_)) ++ emit(None).repeat
// alternativelly
// val p2 = p.map { v => if (pred(v)) right(v) else left(v) }
// .observeO(o.stdOutLines).flatMap { _.toOption }
// ++ emit(None).repeat
(p1 yip p2).run.run
其实应该是这样的:
in.map(emit).flatMap{ p =>
val p1 = p.map(_ + "p1").filter(_ => true).observe(out)
val p2 = p.map(_ + "p2").filter(_ => false).observe(out)
p1 merge p2
}.run.run
它使所有的副作用都井然有序,因为 filter
不能得到超过一个值(由 emit 产生)
filter
(在内部使用 halt
)终止其他分支,即使它有一些副作用:
scala> val p = Process("1","2", "3")
scala> val p1 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
scala> val p2 = p.filter(_ => false).map(_ + "p2").observe(io.stdOutLines)
scala> (p1 yip p2).run.run
1p1
scala> val p2 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
scala> (p1 yip p2).run.run
1p1
1p2
2p1
2p2
3p1
3p2
似乎合乎逻辑,因为在 filter
之后没有值返回给 yip
。但是用 observe
指定的副作用呢?
我目前的解决方案是使用flatMap
来指定默认值:
scala> val p1 = p.map(_ + "p1").flatMap(x => Process.emit(x).observe(io.stdOutLines))
scala> val p2 = p.map(_ + "p2").flatMap(x => Process.emit(""))
scala> (p1 yip p2).run.run
1p1
2p1
3p1
但也许有一种方法可以使用 filter
?
P.S。 merge
组合器对其他分支执行副作用(因为它不需要返回值),但如果一个分支停止它不会等待其他分支(即使它有副作用)。
为了 运行 即使在 p2 终止后的效果也需要明确 default
行为。所以大概有这些解决方案:
- 定义
p2
以在终止后提供默认值 - 如果我们真的不需要元组,请使用
either
星形向左和向右移动
也许 (1) 更接近问题,代码将如下所示:
val p = Process("1","2", "3")
val p1 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
val p2 = p.filter(_ => false).map(_ + "p2")
.observe(io.stdOutLines).map(Some(_)) ++ emit(None).repeat
// alternativelly
// val p2 = p.map { v => if (pred(v)) right(v) else left(v) }
// .observeO(o.stdOutLines).flatMap { _.toOption }
// ++ emit(None).repeat
(p1 yip p2).run.run
其实应该是这样的:
in.map(emit).flatMap{ p =>
val p1 = p.map(_ + "p1").filter(_ => true).observe(out)
val p2 = p.map(_ + "p2").filter(_ => false).observe(out)
p1 merge p2
}.run.run
它使所有的副作用都井然有序,因为 filter
不能得到超过一个值(由 emit 产生)