Akka Stream - 简单 source/sink 示例入口和出口不对应
Akka Stream - simple source/sink example inlets and outlets do not correspond
我开始学习 Akka Stream。我有一个问题,我将其简化为:
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
object Test extends App {
val graph = GraphDSL.create() { implicit b =>
val in = Source.fromIterator(() => (1 to 10).iterator.map(_.toDouble))
b.add(in)
val out = Sink.foreach[Double] { d =>
println(s"elem: $d")
}
b.add(out)
in.to(out)
ClosedShape
}
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
val rg = RunnableGraph.fromGraph(graph)
rg.run()
}
这会引发运行时异常:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [] must correspond to the inlets [map.in] and outlets [StatefulMapConcat.out]
问题是,在我的实际情况下,我无法使用 GraphDSL.Implicits
中的 ~>
运算符,因为 Source
和 Flow
没有通用的超类型(我的图表是从另一个 DSL 创建的,而不是在一个地方创建的)。所以我只能用b.add
和in.to(out)
.
看来必须使用从 builder.add
:
得到的特殊 "copy" 插座
val graph = GraphDSL.create() { implicit b =>
val in = Source.fromIterator(() => (1 to 10).iterator.map(_.toDouble))
val out = Sink.foreach[Double] { d =>
println(s"elem: $d")
}
import GraphDSL.Implicits._
val inOutlet = b.add(in).out
// ... pass inOutlet around until ...
inOutlet ~> out
ClosedShape
}
我开始学习 Akka Stream。我有一个问题,我将其简化为:
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
object Test extends App {
val graph = GraphDSL.create() { implicit b =>
val in = Source.fromIterator(() => (1 to 10).iterator.map(_.toDouble))
b.add(in)
val out = Sink.foreach[Double] { d =>
println(s"elem: $d")
}
b.add(out)
in.to(out)
ClosedShape
}
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
val rg = RunnableGraph.fromGraph(graph)
rg.run()
}
这会引发运行时异常:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [] must correspond to the inlets [map.in] and outlets [StatefulMapConcat.out]
问题是,在我的实际情况下,我无法使用 GraphDSL.Implicits
中的 ~>
运算符,因为 Source
和 Flow
没有通用的超类型(我的图表是从另一个 DSL 创建的,而不是在一个地方创建的)。所以我只能用b.add
和in.to(out)
.
看来必须使用从 builder.add
:
val graph = GraphDSL.create() { implicit b =>
val in = Source.fromIterator(() => (1 to 10).iterator.map(_.toDouble))
val out = Sink.foreach[Double] { d =>
println(s"elem: $d")
}
import GraphDSL.Implicits._
val inOutlet = b.add(in).out
// ... pass inOutlet around until ...
inOutlet ~> out
ClosedShape
}