是否可以基于另一个流在 Akka-Streams 中生成流?
Is it possible to spawn Flows in Akka-Streams based on another stream?
下面的例子 BroadcastHub
可以动态生成监听同一个生产者的工人。但是这种生成必须在代码中显式完成。我想知道它是否可以编码为流中事件的反应。
在下面的示例中,我想在 spawns
流中收到 "Spawn" 消息后再生成 2 个工人。可能吗?
package com.example
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source}
import scala.concurrent.duration._
object TestApp extends App {
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
val ticks = Source.tick(0.second, 1.second, "Tick").take(10)
val broadcaster = ticks.toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).run()
def prefixFlow(tag:String) = Flow[String].map(_ + s" from $tag").to(Sink.foreach(println))
// Print out messages from the producer in two independent consumers
broadcaster.runWith(prefixFlow("1"))
broadcaster.runWith(prefixFlow("2"))
// Is it possible to spawn more flows based on another stream?
val spawns = Source.tick(2.second, 3.second, "Spawn").take(2)
// spawns.foreach(broadcaster.runWith(prefixFlow("XXX"))
}
在您的具体示例中,您是否正在寻找一个简单的 map
?
val spawns = Source
.tick(2.second, 3.second, "Spawn")
.take(2)
.map(_ ⇒ broadcaster.runWith(prefixFlow("XXX")))
.runWith(Sink.ignore)
下面的例子 BroadcastHub
可以动态生成监听同一个生产者的工人。但是这种生成必须在代码中显式完成。我想知道它是否可以编码为流中事件的反应。
在下面的示例中,我想在 spawns
流中收到 "Spawn" 消息后再生成 2 个工人。可能吗?
package com.example
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source}
import scala.concurrent.duration._
object TestApp extends App {
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
val ticks = Source.tick(0.second, 1.second, "Tick").take(10)
val broadcaster = ticks.toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).run()
def prefixFlow(tag:String) = Flow[String].map(_ + s" from $tag").to(Sink.foreach(println))
// Print out messages from the producer in two independent consumers
broadcaster.runWith(prefixFlow("1"))
broadcaster.runWith(prefixFlow("2"))
// Is it possible to spawn more flows based on another stream?
val spawns = Source.tick(2.second, 3.second, "Spawn").take(2)
// spawns.foreach(broadcaster.runWith(prefixFlow("XXX"))
}
在您的具体示例中,您是否正在寻找一个简单的 map
?
val spawns = Source
.tick(2.second, 3.second, "Spawn")
.take(2)
.map(_ ⇒ broadcaster.runWith(prefixFlow("XXX")))
.runWith(Sink.ignore)