当子流中的任何一个值准备就绪时合并流

Merging streams when _any_ of the substreams has a value ready

从 Akka-stream 文档来看,所有流合并选项(merge、mergeSorted、mergePreferred、zipN、zipWithN)似乎都是通过等待所有合并流都准备好新元素,然后应用合并策略(组合元组中的元素,或应用 zip 函数等)

这适用于离线处理(例如从文件或 HTTP 读取数据并将其组合),但它会在在线处理中引入延迟。我需要合并由例如产生的数据流多个 Websocket 连接,并在任何源流产生值时立即在合并流中传递更新。示例:如果有源流 A 和 B,合并流中应包含以下内容:

输出流以一些初始值开始,例如(None, None).

(A:1) (B:<not ready>) -> (Some(1), None)
(A:2) (B:<not ready>) -> (Some(2), None)
(A:3) (B:1)           -> (Some(3), Some(1))
(A:3) (B:2)           -> (Some(3), Some(2))

等同样,当源流的 any 立即产生一个值时,新值出现在输出流中。

是否有任何组合器可以实现这一点?

如评论中所述,MergeMergePreferred 阶段确实会向下游发出元素,即使并非所有上游都有可用元素。

从您的示例来看,您似乎正在寻找压缩源。是的,Zip 仅当它具有要从其所有上游压缩的元素时才向下游发出压缩元组。为了克服这个问题,你可以 'lift' 你的源来产生 Options,并让它们在没有其他东西可以发射的时候发射 None。源包装器可能如下所示:

  def asOption[In, Mat](source: Source[In, Mat]): Source[Option[In], Mat] =
    Source.fromGraph(GraphDSL.create(source.map(Option(_))) {
      implicit builder: GraphDSL.Builder[Mat] => src =>
      import GraphDSL.Implicits._

      val noneSource = Source.repeat(None)
      val merge = builder.add(MergePreferred[Option[In]](1))

      src        ~> merge.preferred
      noneSource ~> merge.in(0)

      SourceShape(merge.out)
    })

此时您可以像往常一样压缩源代码。

  val src1: Source[Int, NotUsed] = ???
  val src2: Source[Int, NotUsed] = ???

  val zipped = asOption(src1) zip asOption(src2)