斯卡拉 Future.find
Scala Future.find
Scala 2.12 有 2 个 Future.find
方法。
@deprecated("use the overloaded version of this method that takes a scala.collection.immutable.Iterable instead", "2.12.0")
def find[T](@deprecatedName('futurestravonce) futures: TraversableOnce[Future[T]])(@deprecatedName('predicate) p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]]
及其重载版本
def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]]
两者描述相同
/** Asynchronously and non-blockingly returns a `Future` that will hold the optional result
* of the first `Future` with a result that matches the predicate, failed `Future`s will be ignored.
*
* @tparam T the type of the value in the future
* @param futures the `scala.collection.immutable.Iterable` of Futures to search
* @param p the predicate which indicates if it's a match
* @return the `Future` holding the optional result of the search
*/
所以我假设这些方法首先找到与给定列表
中的参数p
匹配的完成Future
但实际上只有第一个这样做。
val start = System.currentTimeMillis
val a = (1 to 3).reverse.iterator.map{ x =>
Future{
Thread.sleep(x * 10000)
x
}
}
val b = Future.find(a)(_.isInstanceOf[Int])
b.foreach{ x =>
println(x)
println(System.currentTimeMillis - start) // 10020
}
该方法的弃用版本returns最快的一个。
val a = (1 to 3).reverse.map{ x =>
Future{
Thread.sleep(x * 10000)
x
}
}
val b = Future.find(a)(_.isInstanceOf[Int])
b.foreach{ x =>
println(x)
println(System.currentTimeMillis - start)
}
重载版本returns最慢的版本。更准确地说,它只是从头到尾检查给定的列表,而不关心它们需要多长时间才能完成。
这是应该的样子吗?如果是这样,使用重复的还是自己实现是关心他们完成时间的唯一选择吗?
你是对的,deprecated Future.find 在 2.12.x 中期望 TraversableOnce[Future[T]]
确实与替换 Future.find
的行为不同。从下面粘贴的源代码中可以看出,前者 find
方法利用 Promise
和 tryComplete
有效地从输入集合中捕获第一个完成的未来,而后者使用简单的 hasNext/next
遍历:
@deprecated("use the overloaded version of this method that takes a scala.collection.immutable.Iterable instead", "2.12.0")
def find[T](@deprecatedName('futurestravonce) futures: TraversableOnce[Future[T]])(@deprecatedName('predicate) p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
val futuresBuffer = futures.toBuffer
if (futuresBuffer.isEmpty) successful[Option[T]](None)
else {
val result = Promise[Option[T]]()
val ref = new AtomicInteger(futuresBuffer.size)
val search: Try[T] => Unit = v => try {
v match {
case Success(r) if p(r) => result tryComplete Success(Some(r))
case _ =>
}
} finally {
if (ref.decrementAndGet == 0) {
result tryComplete Success(None)
}
}
futuresBuffer.foreach(_ onComplete search)
result.future
}
}
def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
def searchNext(i: Iterator[Future[T]]): Future[Option[T]] =
if (!i.hasNext) successful[Option[T]](None)
else {
i.next().transformWith {
case Success(r) if p(r) => successful(Some(r))
case other => searchNext(i)
}
}
searchNext(futures.iterator)
}
一种实现您自己的方法可能是使用添加的谓词将 Future.firstCompletedOf
方法扩展为如下所示:
def firstConditionallyCompletedOf[T](futures: List[Future[T]])(p: T => Boolean)(implicit ec: ExecutionContext): Future[T] = {
val p = Promise[T]()
val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) {
override def apply(v1: Try[T]): Unit = getAndSet(null) match {
case null => ()
case some => some tryComplete v1
}
}
futures.foreach{ _.filter(condition).onComplete(firstCompleteHandler) }
p.future
}
Scala 2.12 有 2 个 Future.find
方法。
@deprecated("use the overloaded version of this method that takes a scala.collection.immutable.Iterable instead", "2.12.0")
def find[T](@deprecatedName('futurestravonce) futures: TraversableOnce[Future[T]])(@deprecatedName('predicate) p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]]
及其重载版本
def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]]
两者描述相同
/** Asynchronously and non-blockingly returns a `Future` that will hold the optional result * of the first `Future` with a result that matches the predicate, failed `Future`s will be ignored. * * @tparam T the type of the value in the future * @param futures the `scala.collection.immutable.Iterable` of Futures to search * @param p the predicate which indicates if it's a match * @return the `Future` holding the optional result of the search */
所以我假设这些方法首先找到与给定列表
中的参数p
匹配的完成Future
但实际上只有第一个这样做。
val start = System.currentTimeMillis
val a = (1 to 3).reverse.iterator.map{ x =>
Future{
Thread.sleep(x * 10000)
x
}
}
val b = Future.find(a)(_.isInstanceOf[Int])
b.foreach{ x =>
println(x)
println(System.currentTimeMillis - start) // 10020
}
该方法的弃用版本returns最快的一个。
val a = (1 to 3).reverse.map{ x =>
Future{
Thread.sleep(x * 10000)
x
}
}
val b = Future.find(a)(_.isInstanceOf[Int])
b.foreach{ x =>
println(x)
println(System.currentTimeMillis - start)
}
重载版本returns最慢的版本。更准确地说,它只是从头到尾检查给定的列表,而不关心它们需要多长时间才能完成。
这是应该的样子吗?如果是这样,使用重复的还是自己实现是关心他们完成时间的唯一选择吗?
你是对的,deprecated Future.find 在 2.12.x 中期望 TraversableOnce[Future[T]]
确实与替换 Future.find
的行为不同。从下面粘贴的源代码中可以看出,前者 find
方法利用 Promise
和 tryComplete
有效地从输入集合中捕获第一个完成的未来,而后者使用简单的 hasNext/next
遍历:
@deprecated("use the overloaded version of this method that takes a scala.collection.immutable.Iterable instead", "2.12.0")
def find[T](@deprecatedName('futurestravonce) futures: TraversableOnce[Future[T]])(@deprecatedName('predicate) p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
val futuresBuffer = futures.toBuffer
if (futuresBuffer.isEmpty) successful[Option[T]](None)
else {
val result = Promise[Option[T]]()
val ref = new AtomicInteger(futuresBuffer.size)
val search: Try[T] => Unit = v => try {
v match {
case Success(r) if p(r) => result tryComplete Success(Some(r))
case _ =>
}
} finally {
if (ref.decrementAndGet == 0) {
result tryComplete Success(None)
}
}
futuresBuffer.foreach(_ onComplete search)
result.future
}
}
def find[T](futures: scala.collection.immutable.Iterable[Future[T]])(p: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
def searchNext(i: Iterator[Future[T]]): Future[Option[T]] =
if (!i.hasNext) successful[Option[T]](None)
else {
i.next().transformWith {
case Success(r) if p(r) => successful(Some(r))
case other => searchNext(i)
}
}
searchNext(futures.iterator)
}
一种实现您自己的方法可能是使用添加的谓词将 Future.firstCompletedOf
方法扩展为如下所示:
def firstConditionallyCompletedOf[T](futures: List[Future[T]])(p: T => Boolean)(implicit ec: ExecutionContext): Future[T] = {
val p = Promise[T]()
val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) {
override def apply(v1: Try[T]): Unit = getAndSet(null) match {
case null => ()
case some => some tryComplete v1
}
}
futures.foreach{ _.filter(condition).onComplete(firstCompleteHandler) }
p.future
}