在 scala 中将 AsyncStream[A] 强制转换为 Seq[A]
Forcing an AsyncStream[A] into a Seq[A] in scala
我专门使用 Twitter 的 AsyncStream,我需要获取并发处理的结果,并将其放入 Seq,但我设法开始工作的唯一方法是可怕的。这感觉应该是一个单行,但我对 Await 和 force 的所有尝试都挂起或未处理工作。
这是我的工作 - 更惯用的方法是什么?
def processWork(work: AsyncStream[Work]): Seq[Result] = {
// TODO: make less stupid
val resultStream = work.flatMap { processWork }
var results : Seq[Result] = Nil
resultStream.foreach {
result => {
results = results :+ result
}
}
results
}
就像@MattFowler 指出的那样 - 您可以强制流并等待它完成使用:
Await.result(resultStream.toSeq, 1.second)
toSeq
将开始 实现 流和 return 一个 Future[Seq[A]]
一旦所有元素都被解析就完成,如记录 here.
然后您可以使用Await.result
阻止直到未来完成。
确保您的流是有限的!调用 Await.result(resultStream.toSeq, 1.second)
会永远挂起。
我专门使用 Twitter 的 AsyncStream,我需要获取并发处理的结果,并将其放入 Seq,但我设法开始工作的唯一方法是可怕的。这感觉应该是一个单行,但我对 Await 和 force 的所有尝试都挂起或未处理工作。
这是我的工作 - 更惯用的方法是什么?
def processWork(work: AsyncStream[Work]): Seq[Result] = {
// TODO: make less stupid
val resultStream = work.flatMap { processWork }
var results : Seq[Result] = Nil
resultStream.foreach {
result => {
results = results :+ result
}
}
results
}
就像@MattFowler 指出的那样 - 您可以强制流并等待它完成使用:
Await.result(resultStream.toSeq, 1.second)
toSeq
将开始 实现 流和 return 一个 Future[Seq[A]]
一旦所有元素都被解析就完成,如记录 here.
然后您可以使用Await.result
阻止直到未来完成。
确保您的流是有限的!调用 Await.result(resultStream.toSeq, 1.second)
会永远挂起。