当子流中的任何一个值准备就绪时合并流
Merging streams when _any_ of the substreams has a value ready
从 Akka-stream 文档来看,所有流合并选项(merge、mergeSorted、mergePreferred、zipN、zipWithN)似乎都是通过等待所有合并流都准备好新元素,然后应用合并策略(组合元组中的元素,或应用 zip 函数等)
这适用于离线处理(例如从文件或 HTTP 读取数据并将其组合),但它会在在线处理中引入延迟。我需要合并由例如产生的数据流多个 Websocket 连接,并在任何源流产生值时立即在合并流中传递更新。示例:如果有源流 A 和 B,合并流中应包含以下内容:
输出流以一些初始值开始,例如(None, None)
.
(A:1) (B:<not ready>) -> (Some(1), None)
(A:2) (B:<not ready>) -> (Some(2), None)
(A:3) (B:1) -> (Some(3), Some(1))
(A:3) (B:2) -> (Some(3), Some(2))
等同样,当源流的 any 立即产生一个值时,新值出现在输出流中。
是否有任何组合器可以实现这一点?
如评论中所述,Merge
和 MergePreferred
阶段确实会向下游发出元素,即使并非所有上游都有可用元素。
从您的示例来看,您似乎正在寻找压缩源。是的,Zip
仅当它具有要从其所有上游压缩的元素时才向下游发出压缩元组。为了克服这个问题,你可以 'lift' 你的源来产生 Option
s,并让它们在没有其他东西可以发射的时候发射 None
。源包装器可能如下所示:
def asOption[In, Mat](source: Source[In, Mat]): Source[Option[In], Mat] =
Source.fromGraph(GraphDSL.create(source.map(Option(_))) {
implicit builder: GraphDSL.Builder[Mat] => src =>
import GraphDSL.Implicits._
val noneSource = Source.repeat(None)
val merge = builder.add(MergePreferred[Option[In]](1))
src ~> merge.preferred
noneSource ~> merge.in(0)
SourceShape(merge.out)
})
此时您可以像往常一样压缩源代码。
val src1: Source[Int, NotUsed] = ???
val src2: Source[Int, NotUsed] = ???
val zipped = asOption(src1) zip asOption(src2)
从 Akka-stream 文档来看,所有流合并选项(merge、mergeSorted、mergePreferred、zipN、zipWithN)似乎都是通过等待所有合并流都准备好新元素,然后应用合并策略(组合元组中的元素,或应用 zip 函数等)
这适用于离线处理(例如从文件或 HTTP 读取数据并将其组合),但它会在在线处理中引入延迟。我需要合并由例如产生的数据流多个 Websocket 连接,并在任何源流产生值时立即在合并流中传递更新。示例:如果有源流 A 和 B,合并流中应包含以下内容:
输出流以一些初始值开始,例如(None, None)
.
(A:1) (B:<not ready>) -> (Some(1), None)
(A:2) (B:<not ready>) -> (Some(2), None)
(A:3) (B:1) -> (Some(3), Some(1))
(A:3) (B:2) -> (Some(3), Some(2))
等同样,当源流的 any 立即产生一个值时,新值出现在输出流中。
是否有任何组合器可以实现这一点?
如评论中所述,Merge
和 MergePreferred
阶段确实会向下游发出元素,即使并非所有上游都有可用元素。
从您的示例来看,您似乎正在寻找压缩源。是的,Zip
仅当它具有要从其所有上游压缩的元素时才向下游发出压缩元组。为了克服这个问题,你可以 'lift' 你的源来产生 Option
s,并让它们在没有其他东西可以发射的时候发射 None
。源包装器可能如下所示:
def asOption[In, Mat](source: Source[In, Mat]): Source[Option[In], Mat] =
Source.fromGraph(GraphDSL.create(source.map(Option(_))) {
implicit builder: GraphDSL.Builder[Mat] => src =>
import GraphDSL.Implicits._
val noneSource = Source.repeat(None)
val merge = builder.add(MergePreferred[Option[In]](1))
src ~> merge.preferred
noneSource ~> merge.in(0)
SourceShape(merge.out)
})
此时您可以像往常一样压缩源代码。
val src1: Source[Int, NotUsed] = ???
val src2: Source[Int, NotUsed] = ???
val zipped = asOption(src1) zip asOption(src2)