Akka Streams 中的 Monadic 短路
Monadic short-circuiting in Akka Streams
我想链接一系列形式为 a -> Try[b]
的 Flow
,其中每个连续的阶段处理前一个 Success
的情况,以及 Sink
最后一般处理所有 Failure
。
这个或类似的东西可以简洁地编码吗?其实是一个线性流,但我不确定每个阶段的广播和合并有多短。
解决这个问题的一种方法是定义一个扇出阶段,根据其结果将 Try
分成 2 个流
object PartitionTry {
def apply[T]() = GraphDSL.create[FanOutShape2[Try[T], Throwable, T]]() { implicit builder ⇒
import GraphDSL.Implicits._
val success = builder.add(Flow[Try[T]].collect { case Success(a) ⇒ a })
val failure = builder.add(Flow[Try[T]].collect { case Failure(t) ⇒ t })
val partition = builder.add(Partition[Try[T]](2, _.fold(_ ⇒ 0, _ ⇒ 1)))
partition ~> failure
partition ~> success
new FanOutShape2[Try[T], Throwable, T](partition.in, failure.out, success.out)
}
}
然后您的通用流可以摄取 Try
s 并将 Failure
s 发送到选择的接收器,同时将 Success
es 传递到
object ErrorHandlingFlow {
def apply[T, MatErr](errorSink: Sink[Throwable, MatErr]): Flow[Try[T], T, MatErr] = Flow.fromGraph(
GraphDSL.create(errorSink) { implicit builder ⇒ sink ⇒
import GraphDSL.Implicits._
val partition = builder.add(PartitionTry[T]())
partition.out0 ~> sink
new FlowShape[Try[T], T](partition.in, partition.out1)
}
)
}
使用示例如下
val source : Source[String, NotUsed] = Source(List("1", "2", "hello"))
val convert : Flow[String, Try[Int], NotUsed] = Flow.fromFunction((s: String) ⇒ Try{s.toInt})
val errorsSink : Sink[Throwable, Future[Done]] = Sink.foreach[Throwable](println)
val handleErrors: Flow[Try[Int], Int, Future[Done]] = ErrorHandlingFlow(errorsSink)
source.via(convert).via(handleErrors).runForeach(println)
注意
- 上面定义的 2 个阶段可重复用于任何类型(一次编写,随处使用)
- 此方法可重复用于其他类型 类 - 例如
Either
等
我想链接一系列形式为 a -> Try[b]
的 Flow
,其中每个连续的阶段处理前一个 Success
的情况,以及 Sink
最后一般处理所有 Failure
。
这个或类似的东西可以简洁地编码吗?其实是一个线性流,但我不确定每个阶段的广播和合并有多短。
解决这个问题的一种方法是定义一个扇出阶段,根据其结果将 Try
分成 2 个流
object PartitionTry {
def apply[T]() = GraphDSL.create[FanOutShape2[Try[T], Throwable, T]]() { implicit builder ⇒
import GraphDSL.Implicits._
val success = builder.add(Flow[Try[T]].collect { case Success(a) ⇒ a })
val failure = builder.add(Flow[Try[T]].collect { case Failure(t) ⇒ t })
val partition = builder.add(Partition[Try[T]](2, _.fold(_ ⇒ 0, _ ⇒ 1)))
partition ~> failure
partition ~> success
new FanOutShape2[Try[T], Throwable, T](partition.in, failure.out, success.out)
}
}
然后您的通用流可以摄取 Try
s 并将 Failure
s 发送到选择的接收器,同时将 Success
es 传递到
object ErrorHandlingFlow {
def apply[T, MatErr](errorSink: Sink[Throwable, MatErr]): Flow[Try[T], T, MatErr] = Flow.fromGraph(
GraphDSL.create(errorSink) { implicit builder ⇒ sink ⇒
import GraphDSL.Implicits._
val partition = builder.add(PartitionTry[T]())
partition.out0 ~> sink
new FlowShape[Try[T], T](partition.in, partition.out1)
}
)
}
使用示例如下
val source : Source[String, NotUsed] = Source(List("1", "2", "hello"))
val convert : Flow[String, Try[Int], NotUsed] = Flow.fromFunction((s: String) ⇒ Try{s.toInt})
val errorsSink : Sink[Throwable, Future[Done]] = Sink.foreach[Throwable](println)
val handleErrors: Flow[Try[Int], Int, Future[Done]] = ErrorHandlingFlow(errorsSink)
source.via(convert).via(handleErrors).runForeach(println)
注意
- 上面定义的 2 个阶段可重复用于任何类型(一次编写,随处使用)
- 此方法可重复用于其他类型 类 - 例如
Either
等