如何减少自定义 GraphStage 的 Seq `.via()`?

How to reduce `.via()` down a Seq of custom GraphStage?

我有 GraphStage 的具体子 class,它定义了一些受 class 参数影响的自定义逻辑。

我希望我的应用程序的用户能够提供 Seq 这些自定义 GraphStages。在构建 RunnableGraph 时,我想在 SourceSeq 的第一阶段之间添加边,然后按顺序在每个阶段之间添加边,最后是 Sink。换句话说:src ~> stages.reduce(_ ~> _) ~> sink

不幸的是,这无法编译。我认为原因可能与运算符优先级有关。我尝试使用 .via.foldLeft 更明确,但我不太正确。

感觉这种东西应该有一个相当简单的语法。我在文档中缺少操作员吗?这种动态图是不是什么原因做不到?

下面是使用 String => String 的简单阶段制作的此模式示例。它包括我无法编译的代码,逻辑上代表了我想要表达的图形。

import akka.NotUsed
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.stream._

import scala.concurrent.Future

case class MyStage[T](/* ... params ... */) extends GraphStage[FlowShape[T, T]] {
  val in = Inlet[T]("MyStage.in")
  val out = Outlet[T]("MyStage.out")
  val shape: FlowShape[T, T] = FlowShape.of(in, out)
  def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ??? // Depends on params
}


case class MyApp(stages: Seq[MyStage[String]]) {
  val out = Sink.seq[String]

  val graph = RunnableGraph.fromGraph(GraphDSL.create(out) { implicit b: GraphDSL.Builder[Future[Seq[String]]] =>
    sink =>
      import GraphDSL.Implicits._
      
      val src: Source[String, NotUsed] = Source(Seq("abc", "hello world", "goodbye!"))

      // This is what I logically want to do.
      src ~> stages.reduce(_ ~> _) ~> sink

      ClosedShape
  }
}

您可以像这样创建阶段流程:

     val graph = GraphDSL.create() { implicit b =>
          import GraphDSL.Implicits._

          val stagesShapes = stages.map(b.add(_))

          stagesShapes.reduce { (s1, s2) => 
            s1 ~> s2
            FlowShape(s1.in, s2.out)
          }
      }

然后您只需将源和汇连接到此流,然后 运行 它。