使用 Akka Stream,如何动态复制流?
With Akka Stream, how to dynamically duplicate a flow?
我是 运行 直播视频流服务器。有一个 Array[Byte]
视频源。请注意,我无法与我的视频源建立 2 个连接。我希望连接到我的服务器的每个客户端都接收到相同的流,缓冲区丢弃旧帧。
我试过像这样使用 BroadcastHub
:
val source =
Source.fromIterator(() => myVideoStreamingSource.zipWithIndex)
val runnableGraph =
source.toMat(BroadcastHub.sink(bufferSize = 2))(Keep.right)
runnableGraph.run().to(Sink.foreach { index =>
println(s"client A reading frame #$index")
}).run()
runnableGraph.run().to(Sink.foreach { index =>
println(s"client B reading frame #$index")
}).run()
我得到:
client A reading frame #0
client B reading frame #1
client A reading frame #2
client B reading frame #3
我们看到主流在两个客户端之间进行了分区,而我希望我的两个客户端能够看到所有源流的帧。
我是不是遗漏了什么,或者还有其他解决方案吗?
问题是 Iterator
与 BroadcastHub
的组合。我假设你 myVideoStreamingSource
是这样的:
val myVideoStreamingSource = Iterator("A","B","C","D","E")
我现在引用 BroadcastHub.Sink
:
Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own [[Source]] for consuming the [[Sink]] of that materialization.
这里的问题是它还没有使用迭代器中的数据。
迭代器的问题在于,一旦你消费了它的数据,你就不会再回到起点。再加上两个图 运行 并行的事实,看起来它“划分”了两者之间的元素。但实际上那是完全随机的。例如,如果您在 Client A
和 Client B
之间添加 1 秒的休眠,那么将打印的唯一客户端将是 A
.
为了完成这项工作,您需要创建一个可逆的来源。例如,Seq
或 List
。以下将做:
val myVideoStreamingSource = Seq("A","B","C","D","E")
val source = Source.fromIterator(() => myVideoStreamingSource.zipWithIndex.iterator)
我是 运行 直播视频流服务器。有一个 Array[Byte]
视频源。请注意,我无法与我的视频源建立 2 个连接。我希望连接到我的服务器的每个客户端都接收到相同的流,缓冲区丢弃旧帧。
我试过像这样使用 BroadcastHub
:
val source =
Source.fromIterator(() => myVideoStreamingSource.zipWithIndex)
val runnableGraph =
source.toMat(BroadcastHub.sink(bufferSize = 2))(Keep.right)
runnableGraph.run().to(Sink.foreach { index =>
println(s"client A reading frame #$index")
}).run()
runnableGraph.run().to(Sink.foreach { index =>
println(s"client B reading frame #$index")
}).run()
我得到:
client A reading frame #0
client B reading frame #1
client A reading frame #2
client B reading frame #3
我们看到主流在两个客户端之间进行了分区,而我希望我的两个客户端能够看到所有源流的帧。
我是不是遗漏了什么,或者还有其他解决方案吗?
问题是 Iterator
与 BroadcastHub
的组合。我假设你 myVideoStreamingSource
是这样的:
val myVideoStreamingSource = Iterator("A","B","C","D","E")
我现在引用 BroadcastHub.Sink
:
Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own [[Source]] for consuming the [[Sink]] of that materialization.
这里的问题是它还没有使用迭代器中的数据。
迭代器的问题在于,一旦你消费了它的数据,你就不会再回到起点。再加上两个图 运行 并行的事实,看起来它“划分”了两者之间的元素。但实际上那是完全随机的。例如,如果您在 Client A
和 Client B
之间添加 1 秒的休眠,那么将打印的唯一客户端将是 A
.
为了完成这项工作,您需要创建一个可逆的来源。例如,Seq
或 List
。以下将做:
val myVideoStreamingSource = Seq("A","B","C","D","E")
val source = Source.fromIterator(() => myVideoStreamingSource.zipWithIndex.iterator)