文档中的 Akka Streams 循环示例不起作用
Akka Streams cycle example in docs not working
我正在尝试构建一个带有简单循环的 Akka Stream。在阅读了文档 here 但没有成功之后,我尝试只复制示例代码作为起始基础,但这也行不通。代码编译(在包含示例中缺少的源代码之后)但没有打印出任何内容。看起来好像有什么东西永远背压,但我不明白为什么。
这是我的代码,如有任何帮助,我们将不胜感激:
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.stream.scaladsl._
import akka.stream.ClosedShape
object Simulate {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
def main(args: Array[String]): Unit = {
// Define simulation flowgraph
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import b._
import GraphDSL.Implicits._
val source = add(Source.repeat[Int](1))
val zip = add(ZipWith[Int, Int, Int]((left, right) => left))
val bcast = add(Broadcast[Int](2))
val concat = add(Concat[Int]())
val start = add(Source.single[Int](0))
val sink = add(Sink.ignore)
source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> sink
concat <~ bcast
zip.in1 <~ concat <~ start
ClosedShape
})
g.run()
}
}
编辑:实际上问题似乎不是添加缓冲区,而是声明了订单入口/出口。
这个有效:
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val source = Source.repeat(1)
val start = Source.single(0)
val zip = b.add(ZipWith((left: Int, right: Int) => left))
val bcast = b.add(Broadcast[Int](2))
val concat = b.add(Concat[Int]())
source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore
zip.in1 <~ concat <~ start
concat <~ bcast
ClosedShape
})
g.run()
zip.in1 <~ concat <~ start
和 concat <~ bcast
的顺序与文档中的一致。
我正在尝试构建一个带有简单循环的 Akka Stream。在阅读了文档 here 但没有成功之后,我尝试只复制示例代码作为起始基础,但这也行不通。代码编译(在包含示例中缺少的源代码之后)但没有打印出任何内容。看起来好像有什么东西永远背压,但我不明白为什么。
这是我的代码,如有任何帮助,我们将不胜感激:
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.stream.scaladsl._
import akka.stream.ClosedShape
object Simulate {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
def main(args: Array[String]): Unit = {
// Define simulation flowgraph
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import b._
import GraphDSL.Implicits._
val source = add(Source.repeat[Int](1))
val zip = add(ZipWith[Int, Int, Int]((left, right) => left))
val bcast = add(Broadcast[Int](2))
val concat = add(Concat[Int]())
val start = add(Source.single[Int](0))
val sink = add(Sink.ignore)
source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> sink
concat <~ bcast
zip.in1 <~ concat <~ start
ClosedShape
})
g.run()
}
}
编辑:实际上问题似乎不是添加缓冲区,而是声明了订单入口/出口。
这个有效:
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val source = Source.repeat(1)
val start = Source.single(0)
val zip = b.add(ZipWith((left: Int, right: Int) => left))
val bcast = b.add(Broadcast[Int](2))
val concat = b.add(Concat[Int]())
source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore
zip.in1 <~ concat <~ start
concat <~ bcast
ClosedShape
})
g.run()
zip.in1 <~ concat <~ start
和 concat <~ bcast
的顺序与文档中的一致。