Akka Streams 中的 Monadic 短路

Monadic short-circuiting in Akka Streams

我想链接一系列形式为 a -> Try[b]Flow,其中每个连续的阶段处理前一个 Success 的情况,以及 Sink 最后一般处理所有 Failure

这个或类似的东西可以简洁地编码吗?其实是一个线性流,但我不确定每个阶段的广播和合并有多短。

解决这个问题的一种方法是定义一个扇出阶段,根据其结果将 Try 分成 2 个流

  object PartitionTry {
    def apply[T]() = GraphDSL.create[FanOutShape2[Try[T], Throwable, T]]() { implicit builder ⇒
      import GraphDSL.Implicits._

      val success = builder.add(Flow[Try[T]].collect { case Success(a) ⇒ a })
      val failure = builder.add(Flow[Try[T]].collect { case Failure(t) ⇒ t })
      val partition = builder.add(Partition[Try[T]](2, _.fold(_ ⇒ 0, _ ⇒ 1)))

      partition ~> failure
      partition ~> success

      new FanOutShape2[Try[T], Throwable, T](partition.in, failure.out, success.out)
    }
  }

然后您的通用流可以摄取 Trys 并将 Failures 发送到选择的接收器,同时将 Successes 传递到

  object ErrorHandlingFlow {
    def apply[T, MatErr](errorSink: Sink[Throwable, MatErr]): Flow[Try[T], T, MatErr] = Flow.fromGraph(
      GraphDSL.create(errorSink) { implicit builder ⇒ sink ⇒
        import GraphDSL.Implicits._

        val partition = builder.add(PartitionTry[T]())

        partition.out0 ~> sink

        new FlowShape[Try[T], T](partition.in, partition.out1)
      }
    )
  }

使用示例如下

  val source      : Source[String, NotUsed]           = Source(List("1", "2", "hello"))
  val convert     : Flow[String, Try[Int], NotUsed]   = Flow.fromFunction((s: String) ⇒ Try{s.toInt})
  val errorsSink  : Sink[Throwable, Future[Done]]     = Sink.foreach[Throwable](println)
  val handleErrors: Flow[Try[Int], Int, Future[Done]] = ErrorHandlingFlow(errorsSink)

  source.via(convert).via(handleErrors).runForeach(println)

注意

  • 上面定义的 2 个阶段可重复用于任何类型(一次编写,随处使用)
  • 此方法可重复用于其他类型 类 - 例如 Either