在 scala 中将 AsyncStream[A] 强制转换为 Seq[A]

Forcing an AsyncStream[A] into a Seq[A] in scala

我专门使用 Twitter 的 AsyncStream,我需要获取并发处理的结果,并将其放入 Seq,但我设法开始工作的唯一方法是可怕的。这感觉应该是一个单行,但我对 Await 和 force 的所有尝试都挂起或未处理工作。

这是我的工作 - 更惯用的方法是什么?

  def processWork(work: AsyncStream[Work]): Seq[Result] = {
    // TODO: make less stupid
    val resultStream = work.flatMap { processWork }
    var results : Seq[Result] = Nil
    resultStream.foreach {
      result => {
        results = results :+ result
      }
    }
    results
  }

就像@MattFowler 指出的那样 - 您可以强制流并等待它完成使用:

Await.result(resultStream.toSeq, 1.second)

toSeq 将开始 实现 流和 return 一个 Future[Seq[A]] 一旦所有元素都被解析就完成,如记录 here.

然后您可以使用Await.result阻止直到未来完成。

确保您的流是有限的!调用 Await.result(resultStream.toSeq, 1.second) 会永远挂起。