为什么 Future.firstCompletedOf 不在超时时调用回调?

Why does Future.firstCompletedOf not invoke callback on timeout?

我正在做 Learning Concurrent Programming in Scala 的练习。

对于代码注释中的练习题。

  1. 程序打印 HTML 内容的正确输出以获得正确的 URL 和足够的超时。
  2. 程序打印 "Error occured" 以获得适当的 URL 和低超时。

但是对于无效的 URL "Error occured" 不打印。下面的代码有什么问题?

/*
 * Implement a command-line program that asks the user to input a URL of some website, 
 * and displays the HTML of that website. Between the time that the user hits ENTER and 
 * the time that the HTML is retrieved, the program should repetitively print a . to the 
 * standard output every 50 milliseconds, with a two seconds timeout. Use only futures 
 * and promises, and avoid the synchronization primitives from the previous chapters. 
 * You may reuse the timeout method defined in this chapter.
 */
object Excersices extends App {

  val timer = new Timer()

  def timeout(t: Long = 1000): Future[Unit] = {
    val p = Promise[Unit]
    val timer = new Timer(true)
    timer.schedule(new TimerTask() {
      override def run() = {
        p success ()
        timer cancel()
      }
    }, t)
    p future
  }

  def printDot = println(".")

  val taskOfPrintingDot = new TimerTask {
    override def run() = printDot
  }

  println("Enter a URL")
  val lines = io.Source.stdin.getLines()
  val url = if (lines hasNext) Some(lines next) else None

  timer.schedule(taskOfPrintingDot, 0L, 50.millisecond.toMillis)
  val timeOut2Sec = timeout(2.second.toMillis)

  val htmlContents = Future {
    url map { x =>
      blocking {
        Source fromURL (x) mkString
      }
    }
  }

  Future.firstCompletedOf(Seq(timeOut2Sec, htmlContents)) map { x =>
    timer cancel ()
    x match {
      case Some(x) =>
        println(x)
      case _ =>
        println("Error occured")
    }

  }

  Thread sleep 5000
}

正如@Gábor Bakos 所说,异常会产生 Failure,它不会被 map 处理:

val fut = Future { Some(Source fromURL ("hhhttp://google.com")) }

scala> fut  map { x => println(x) } //nothing printed
res12: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@5e025724

要处理失败 - 使用 recover 方法:

scala> fut recover { case failure => None } map { x => println(x) } 
None
res13: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@578afc83

在您的上下文中,它类似于:

Future.firstCompletedOf(Seq(timeOut2Sec, htmlContents)) recover {case x => println("Error:" + x); None} map { x => ...}

按照@dk14 的建议使用恢复后的完整代码:

object Exercises extends App {

  val timer = new Timer()
  def timeout(t: Long = 1000): Future[Unit] = {
    val p = Promise[Unit]
    val timer = new Timer(true)
    timer.schedule(new TimerTask() {
      override def run() = {
        p success ()
        timer cancel ()
      }
    }, t)
    p future
  }

  def printDot = println(".")

  val taskOfPrintingDot = new TimerTask {
    override def run() = {
      printDot
    }
  }

  println("Enter a URL")
  val lines = io.Source.stdin.getLines()
  val url = if (lines hasNext) Some(lines next) else None

  timer.schedule(taskOfPrintingDot, 0L, 50.millisecond.toMillis)
  val timeOut2Sec = timeout(2.second.toMillis)

  val htmlContents = Future {
    url map { x =>
      blocking {
        Source fromURL (x) mkString
      }
    }
  }

  Future.firstCompletedOf(Seq(timeOut2Sec, htmlContents))
    .recover { case x => println("Error:" + x); None }
    .map { x =>
      timer cancel ()
      x match {
        case Some(x) =>
          println(x)
        case _ =>
          println("Timeout occurred")
      }
    }

  Thread sleep 5000
}