Akka Streams:如何从 GraphDSL API 获取 Materialized Sink 输出?
Akka Streams: How do I get Materialized Sink output from GraphDSL API?
这是一个非常简单的新手问题,使用 GraphDSL API。我阅读了几个相关的 SO 主题,但没有看到答案:
val actorSystem = ActorSystem("QuickStart")
val executor = actorSystem.dispatcher
val materializer = ActorMaterializer()(actorSystem)
val source: Source[Int, NotUsed] = Source(1 to 5)
val throttledSource = source.throttle(1, 1.second, 1, ThrottleMode.shaping)
val intDoublerFlow = Flow.fromFunction[Int, Int](i => i * 2)
val sink = Sink.foreach(println)
val graphModel = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
throttledSource ~> intDoublerFlow ~> sink
// I presume I want to change this shape to something else
// but I can't figure out what it is.
ClosedShape
}
// TODO: This is RunnableGraph[NotUsed], I want RunnableGraph[Future[Done]] that gives the
// materialized Future[Done] from the sink. I presume I need to use a GraphDSL SourceShape
// but I can't get that working.
val graph = RunnableGraph.fromGraph(graphModel)
// This works and gives me the materialized sink output using the simpler API.
// But I want to use the GraphDSL so that I can add branches or junctures.
val graphThatIWantFromDslAPI = throttledSource.toMat(sink)(Keep.right)
val graphModel = GraphDSL.create(sink) { implicit b: Builder[Future[Done]] => sink =>
import akka.stream.scaladsl.GraphDSL.Implicits._
throttledSource ~> intDoublerFlow ~> sink
ClosedShape
}
val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel)
val graphThatIWantFromDslAPI: RunnableGraph[Future[Done]] = throttledSource.toMat(sink)(Keep.right)
GraphDSL API 的问题是,隐式生成器严重超载。您需要将接收器包裹在 create
中,这会将 Builder[NotUsed]
变成 Builder[Future[Done]]
并且现在表示来自 builder => sink => shape
而不是 builder => shape
.[=16= 的函数]
诀窍是将您想要的物化值(在您的情况下为 sink
)传递给 GraphDSL.create
。作为第二个参数传递的函数也会发生变化,需要一个 Shape
输入参数(下例中的 s
),您可以在图表中使用它。
val graphModel: Graph[ClosedShape, Future[Done]] = GraphDSL.create(sink) { implicit b => s =>
import GraphDSL.Implicits._
throttledSource ~> intDoublerFlow ~> s
// ClosedShape is just fine - it is always the shape of a RunnableGraph
ClosedShape
}
val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel)
可以在 docs 中找到更多信息。
这是一个非常简单的新手问题,使用 GraphDSL API。我阅读了几个相关的 SO 主题,但没有看到答案:
val actorSystem = ActorSystem("QuickStart")
val executor = actorSystem.dispatcher
val materializer = ActorMaterializer()(actorSystem)
val source: Source[Int, NotUsed] = Source(1 to 5)
val throttledSource = source.throttle(1, 1.second, 1, ThrottleMode.shaping)
val intDoublerFlow = Flow.fromFunction[Int, Int](i => i * 2)
val sink = Sink.foreach(println)
val graphModel = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
throttledSource ~> intDoublerFlow ~> sink
// I presume I want to change this shape to something else
// but I can't figure out what it is.
ClosedShape
}
// TODO: This is RunnableGraph[NotUsed], I want RunnableGraph[Future[Done]] that gives the
// materialized Future[Done] from the sink. I presume I need to use a GraphDSL SourceShape
// but I can't get that working.
val graph = RunnableGraph.fromGraph(graphModel)
// This works and gives me the materialized sink output using the simpler API.
// But I want to use the GraphDSL so that I can add branches or junctures.
val graphThatIWantFromDslAPI = throttledSource.toMat(sink)(Keep.right)
val graphModel = GraphDSL.create(sink) { implicit b: Builder[Future[Done]] => sink =>
import akka.stream.scaladsl.GraphDSL.Implicits._
throttledSource ~> intDoublerFlow ~> sink
ClosedShape
}
val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel)
val graphThatIWantFromDslAPI: RunnableGraph[Future[Done]] = throttledSource.toMat(sink)(Keep.right)
GraphDSL API 的问题是,隐式生成器严重超载。您需要将接收器包裹在 create
中,这会将 Builder[NotUsed]
变成 Builder[Future[Done]]
并且现在表示来自 builder => sink => shape
而不是 builder => shape
.[=16= 的函数]
诀窍是将您想要的物化值(在您的情况下为 sink
)传递给 GraphDSL.create
。作为第二个参数传递的函数也会发生变化,需要一个 Shape
输入参数(下例中的 s
),您可以在图表中使用它。
val graphModel: Graph[ClosedShape, Future[Done]] = GraphDSL.create(sink) { implicit b => s =>
import GraphDSL.Implicits._
throttledSource ~> intDoublerFlow ~> s
// ClosedShape is just fine - it is always the shape of a RunnableGraph
ClosedShape
}
val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel)
可以在 docs 中找到更多信息。