如何减少自定义 GraphStage 的 Seq `.via()`?
How to reduce `.via()` down a Seq of custom GraphStage?
我有 GraphStage
的具体子 class,它定义了一些受 class 参数影响的自定义逻辑。
我希望我的应用程序的用户能够提供 Seq
这些自定义 GraphStages
。在构建 RunnableGraph
时,我想在 Source
和 Seq
的第一阶段之间添加边,然后按顺序在每个阶段之间添加边,最后是 Sink
。换句话说:src ~> stages.reduce(_ ~> _) ~> sink
不幸的是,这无法编译。我认为原因可能与运算符优先级有关。我尝试使用 .via
或 .foldLeft
更明确,但我不太正确。
感觉这种东西应该有一个相当简单的语法。我在文档中缺少操作员吗?这种动态图是不是什么原因做不到?
下面是使用 String => String
的简单阶段制作的此模式示例。它包括我无法编译的代码,逻辑上代表了我想要表达的图形。
import akka.NotUsed
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.stream._
import scala.concurrent.Future
case class MyStage[T](/* ... params ... */) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("MyStage.in")
val out = Outlet[T]("MyStage.out")
val shape: FlowShape[T, T] = FlowShape.of(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ??? // Depends on params
}
case class MyApp(stages: Seq[MyStage[String]]) {
val out = Sink.seq[String]
val graph = RunnableGraph.fromGraph(GraphDSL.create(out) { implicit b: GraphDSL.Builder[Future[Seq[String]]] =>
sink =>
import GraphDSL.Implicits._
val src: Source[String, NotUsed] = Source(Seq("abc", "hello world", "goodbye!"))
// This is what I logically want to do.
src ~> stages.reduce(_ ~> _) ~> sink
ClosedShape
}
}
您可以像这样创建阶段流程:
val graph = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val stagesShapes = stages.map(b.add(_))
stagesShapes.reduce { (s1, s2) =>
s1 ~> s2
FlowShape(s1.in, s2.out)
}
}
然后您只需将源和汇连接到此流,然后 运行 它。
我有 GraphStage
的具体子 class,它定义了一些受 class 参数影响的自定义逻辑。
我希望我的应用程序的用户能够提供 Seq
这些自定义 GraphStages
。在构建 RunnableGraph
时,我想在 Source
和 Seq
的第一阶段之间添加边,然后按顺序在每个阶段之间添加边,最后是 Sink
。换句话说:src ~> stages.reduce(_ ~> _) ~> sink
不幸的是,这无法编译。我认为原因可能与运算符优先级有关。我尝试使用 .via
或 .foldLeft
更明确,但我不太正确。
感觉这种东西应该有一个相当简单的语法。我在文档中缺少操作员吗?这种动态图是不是什么原因做不到?
下面是使用 String => String
的简单阶段制作的此模式示例。它包括我无法编译的代码,逻辑上代表了我想要表达的图形。
import akka.NotUsed
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.stream._
import scala.concurrent.Future
case class MyStage[T](/* ... params ... */) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("MyStage.in")
val out = Outlet[T]("MyStage.out")
val shape: FlowShape[T, T] = FlowShape.of(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ??? // Depends on params
}
case class MyApp(stages: Seq[MyStage[String]]) {
val out = Sink.seq[String]
val graph = RunnableGraph.fromGraph(GraphDSL.create(out) { implicit b: GraphDSL.Builder[Future[Seq[String]]] =>
sink =>
import GraphDSL.Implicits._
val src: Source[String, NotUsed] = Source(Seq("abc", "hello world", "goodbye!"))
// This is what I logically want to do.
src ~> stages.reduce(_ ~> _) ~> sink
ClosedShape
}
}
您可以像这样创建阶段流程:
val graph = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val stagesShapes = stages.map(b.add(_))
stagesShapes.reduce { (s1, s2) =>
s1 ~> s2
FlowShape(s1.in, s2.out)
}
}
然后您只需将源和汇连接到此流,然后 运行 它。