Scala 的 Future 和 ExecutionContext 执行

Scala's Future and ExecutionContext Execution

假设我有以下一组代码在 Future 中执行某些操作:

1 to 10 foreach {
  case x => Future { x + x }
}

假设我给这段代码默认的ExecutionContext,我知道后台发生了什么,但我想知道的是Future的处理实际上是怎么做的?我的意思是应该有一些线程或一组线程可能正在等待 Future 完成?这些线程被阻塞了吗?在他们真正等待 Future 完成的意义上被阻塞?

现在在以下场景中:

val x: Future[MyType] = finishInSomeFuture()

假设 x 有一个超时,我可以这样调用:

Future {
  blocking {
    x.get(3, TimeOut.SECONDS)
  }
} 

我真的挡了吗?有没有更好的异步超时方法?

编辑:以下超时与我在上面定义的阻塞上下文有何不同或更好?

object TimeoutFuture {
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {

    val prom = promise[A]

    // timeout logic
    Akka.system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }

    // business logic
    Future { 
      prom success block
    }

    prom.future
  } 
}
  1. Let's say I have the following set of code that does something in a Future:

    1 to 10 foreach {
      case x => Future { x + x }
    }
    

    ...

您的代码片段创建了 10 个 Futures,它们会立即设置为使用隐式 ExecutionContext 提供的线程执行。由于您不存储对期货的引用,并且不等待它们的执行,因此您的主线程(定义 foreach 的地方)不会阻塞并立即继续执行。如果那段代码在 main 方法的末尾,那么,根据是否 ThreadFactory in ExecutionContext 产生的 daemon 线程程序可能会退出而不等待 Futures 完成.

  1. Now in the following scenario:

    val x: Future[MyType] = finishInSomeFuture()
    

    Assuming that x has a timeout that I can call like this:

    Future {
      blocking {
        x.get(3, TimeOut.SECONDS)
      }
    } 
    

    Am I really blocking? Is there a better way to timeout asynchronously?

你的意思可能是 Await.result 而不是 x.get:

def inefficientTimeoutFuture[T](f:Future[T], x:Duration) = Future { Await.result(f, x) }

在这种情况下,未来的 f 将在单独的线程中计算,而其他线程将被阻塞等待计算 f

Using scheduler to create TimeoutFuture 效率更高,因为调度程序通常共享固定数量的线程(通常是一个),而 Await.result 中的阻塞总是需要额外的线程来阻塞。

  1. I would like to know how I could timeout without blocking?

使用调度程序创建 TimeoutFuture 允许您在不阻塞的情况下超时操作。您将 Future 包装在超时助手中,新的 Future 要么成功完成,要么由于超时而失败(以先到者为准)。新的 Future 具有相同的异步性质,如何使用它取决于您(注册 onComplete 回调或同步等待结果,阻塞主线程)。


UPD 我将尝试阐明有关多线程和阻塞的一些基本知识。

现在异步非阻塞方法是趋势,但你必须了解阻塞意味着什么以及为什么要避免它。

Java 中的每个线程都需要付费。首先,创建新线程的成本相对较高(这就是线程池存在的原因),其次,它会消耗内存。为什么不 CPU?因为您的 CPU 资源受限于您拥有的内核数量。无论您有多少个活动线程,您的并行度级别将始终受到内核数量的限制。如果线程处于非活动状态(阻塞),它不会消耗 CPU。

在现代 java 应用程序中,您可以创建相当多的线程(数千个)。问题是在某些情况下您无法预测需要多少个线程。这就是异步方法发挥作用的时候。它说:与其在其他线程执行任务时阻塞当前线程,不如让我们将下一步的回调和 return 当前线程包装到池中,这样它就可以做一些其他有用的工作。所以几乎所有的线程都忙于做实际工作,而不是等待和消耗内存。

现在以计时器为例。如果你使用基于 netty 的 HashedWheelTimer,你可以让它由单线程支持,并安排数千个事件。当您创建因等待超时而阻塞的 Future 时,每个 "schedule" 占用一个线程。因此,如果您安排了一千个超时,您最终将得到一千个阻塞线程(这再次消耗内存,而不是 cpu)。

现在你的 "main" 未来(你想在超时中包装)也不必阻塞线程。比如你在future内部进行同步http请求,你的线程会被阻塞,但是如果你使用netty-based AsyncHttpClient(例如),你可以使用不占用线程的promise-based future线。在这种情况下,您可以拥有少量固定数量的线程来处理任意数量的请求(数十万)。


UPD2

But there should be some thread that should be blocking even in case of the Timer as it has to wait for the Timeout millis. So what and where is the benefit? I still block, but may be I block less in the Timer case or?

这仅适用于一种特定情况:当您有主线程等待异步任务完成时。在这种情况下你是对的,没有办法在不阻塞主线程的情况下将操作包装在超时中。在这种情况下使用定时器没有任何意义。你只需要额外的线程来执行你的操作,而主线程等待结果或超时。

但通常Futures用于更复杂的场景,没有"main"线程。例如,想象一下异步网络服务器,请求进来,你创建 Future 来处理它并注册回调来回复。没有 "main" 线程等待任何事情。

或者另一个例子,您想要向外部服务发出 1000 个请求,并设置各自的超时时间,然后将所有结果收集到一个地方。如果您有该服务的异步客户端,您可以创建 1000 个请求,将它们包装在异步超时中,然后组合成一个 Future。您可以阻塞主线程以等待未来完成或注册回调以打印结果,但您不必创建 1000 个线程来等待每个单独的请求完成。

所以,关键是:如果你已经有了同步流,并且你想在超时中包装它的一部分,你唯一能做的就是让你当前的线程阻塞,直到其他线程执行工作。 如果你想避免阻塞,你需要从一开始就使用异步方法。