即使一个或多个失败,如何产生未知数量的 Futures 并合并结果?

How to spawn an unknown amount of Futures and combine the result even if one or more failed?

我想将以下顺序代码转换为带有 Futures 的并发代码,需要有关如何构建它的建议。

顺序:

import java.net.URL

val providers = List(
  new URL("http://www.cnn.com"),
  new URL("http://www.bbc.co.uk"),
  new URL("http://www.othersite.com")
)

def download(urls: URL*) = urls.flatMap(url => io.Source.fromURL(url).getLines).distinct

val res = download(providers:_*)

我想下载通过下载方法的可变参数传入的所有源,并将结果合并为一个 Seq/List/Set,无论如何,一起。当一个 Future 失败时,比方说因为服务器无法访问,它应该接受所有其他 Future 并继续前进并且 return 结果仍然存在。 firstCompletedOf 将不起作用,因为我需要所有的结果,除了一个因错误而失败。我想过像下面那样使用 Future.sequence 但我无法让它工作。这是我尝试过的...

def download(urls: URL*) = Future.sequence {
  urls.map { url =>
    Future {
      io.Source.fromURL(url).getLines
    }
  }
} 

这会产生一个与 M_[Future[A_]] 不兼容的 Seq[Future[Iterator[String]]]。

A Future[Iterator[String]] 就是我想要的。 (我认为我 return 是一个迭代器,因为稍后我需要在迭代器上使用重置方法重用它。)

您可以使用 parallel collections:

import java.net.URL

val providers = List(
  new URL("http://www.cnn.com"),
  new URL("http://www.bbc.co.uk"),
  new URL("http://www.othersite.com")
)

def download(urls: URL*) = urls.par.flatMap(url => {
  Try {
    io.Source.fromURL(url).getLines
  } match {
    case Success(e) => e
    case Failure(_) => Seq()
  }
}).toSeq

val res: Seq[String] = download(providers:_*)

或者,如果您想要带有 Future 的非阻塞版本:

def download(urls: URL*) = Future {
  blocking {
    urls.par.flatMap(url => {
      Try {
        io.Source.fromURL(url).getLines
      } match {
        case Success(e) => e
        case Failure(_) => Seq()
      }
    })
  }
}

val res: Future[Seq[String]] = download(providers:_*)