使用 Akka Stream,如何动态复制流?

With Akka Stream, how to dynamically duplicate a flow?

我是 运行 直播视频流服务器。有一个 Array[Byte] 视频源。请注意,我无法与我的视频源建立 2 个连接。我希望连接到我的服务器的每个客户端都接收到相同的流,缓冲区丢弃旧帧。

我试过像这样使用 BroadcastHub :

  val source =
    Source.fromIterator(() => myVideoStreamingSource.zipWithIndex)

  val runnableGraph =
    source.toMat(BroadcastHub.sink(bufferSize = 2))(Keep.right)

  runnableGraph.run().to(Sink.foreach { index =>
      println(s"client A reading frame #$index")
  }).run()

  runnableGraph.run().to(Sink.foreach { index =>
      println(s"client B reading frame #$index")
  }).run()

我得到:

client A reading frame #0
client B reading frame #1
client A reading frame #2
client B reading frame #3

我们看到主流在两个客户端之间进行了分区,而我希望我的两个客户端能够看到所有源流的帧。

我是不是遗漏了什么,或者还有其他解决方案吗?

问题是 IteratorBroadcastHub 的组合。我假设你 myVideoStreamingSource 是这样的:

val myVideoStreamingSource = Iterator("A","B","C","D","E")

我现在引用 BroadcastHub.Sink:

Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own [[Source]] for consuming the [[Sink]] of that materialization.

这里的问题是它还没有使用迭代器中的数据。

迭代器的问题在于,一旦你消费了它的数据,你就不会再回到起点。再加上两个图 运行 并行的事实,看起来它“划分”了两者之间的元素。但实际上那是完全随机的。例如,如果您在 Client AClient B 之间添加 1 秒的休眠,那么将打印的唯一客户端将是 A.

为了完成这项工作,您需要创建一个可逆的来源。例如,SeqList。以下将做:

val myVideoStreamingSource = Seq("A","B","C","D","E")
val source = Source.fromIterator(() => myVideoStreamingSource.zipWithIndex.iterator)