基于生成器的协程看似无限递归

Seemingly infinite recursion with generator based coroutines

以下内容摘自 David Beazley 关于发电机的幻灯片(here 感兴趣的人)。

A Task class 被定义,它包装了一个生成期货的生成器,Task class,完整(w/o 错误处理),如下:

class Task:
    def __init__(self, gen):
        self._gen = gen

    def step(self, value=None):
        try:
            fut = self._gen.send(value)
            fut.add_done_callback(self._wakeup)
        except StopIteration as exc:
            pass

    def _wakeup(self, fut):
        result = fut.result()
        self.step(result)

在一个例子中,还定义了以下递归函数:

from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(max_workers=8)

def recursive(n):
   yield pool.submit(time.sleep, 0.001)
   print("Tick :", n)
   Task(recursive(n+1)).step()

以下两种情况发生:

  1. 在 Python REPL 中,如果我们定义这些(或者如果我们将它们放在文件中则导入它们)然后使用以下命令启动递归:

    Task(recursive(0)).step()
    

    它开始打印,似乎已经超出了递归限制。它 显然 并没有超过它,打印堆栈级别表明它在整个执行过程中保持不变。其他事情正在发生,我不太明白。

    注意:如果你这样执行,你需要终止python进程。

  2. 如果我们将所有内容(Taskrecursive)与以下文件一起放入文件中:

    if __name__ == "__main__":
        Task(recursive(0)).step()
    

    然后 运行 它与 python myfile.py,它在 7 停止滴答(似乎是 max_workers 的数字)。


我的问题是它怎么看起来超过了递归限制,为什么它的行为会根据你的执行方式而有所不同?

该行为出现在 Python 3.6.2 和 Python 3.5.4 上(我猜 3.63.5 系列中的其他人也一样) .

让我们从数字 7 开始。那就是你已经提到的工人数量,标记为 [0..7]Task class 需要以函数标识符的形式传递 recursive

Task(recursive).step(n) 

而不是

Task(recursive(n)).step()

这是因为 recursive 函数需要在 pool 环境中调用,而在当前情况下 recursive 是在主线程本身中计算的。 time.sleep 是当前代码中唯一在任务池中评估的函数。

代码存在重大问题的一个关键方面是递归。池中的每个线程都依赖于将执行上限设置为可用工作线程数的内部函数。该功能无法完成,因此无法执行新功能。因此,它在达到递归限制之前就终止了。

您显示的 recursive 生成器实际上不是以会导致系统递归限制问题的方式递归。

理解为什么需要注意当recursive生成器的代码运行s。与普通函数不同,仅调用 recursive(0) 不会导致它立即 运行 其代码并进行额外的递归调用。相反,立即调用 recursive(0) returns 生成器对象。只有当您 send() 到生成器时才会执行代码 运行,并且只有在您第二次向它 send() 之后它才会启动另一个调用。

让我们检查调用堆栈作为代码 运行s。在顶层,我们 运行 Task(recursive(0)).step()。这按顺序做了三件事:

  1. recursive(0) 这会立即调用 returns 生成器对象。
  2. Task(_)创建了Task对象,其__init__方法存储了对第一步创建的生成器对象的引用
  3. _.step() 任务上的一个方法被调用。这是行动真正开始的地方!让我们看看调用内部发生了什么:

    • fut = self._gen.send(value) 这里我们实际上启动了生成器 运行ning,通过向它发送一个值。让我们深入看看生成器代码运行:
      • yield pool.submit(time.sleep, 0.001) 这安排了在另一个线程中完成的事情。我们不会等待它发生。相反,我们得到一个 Future ,我们可以用它在完成时得到通知。我们立即让出未来回到上一层代码。
    • fut.add_done_callback(self._wakeup) 在这里,我们要求在未来准备就绪时调用我们的 _wakeup() 方法。这总是returns立即!
    • step方法到此结束。没错,我们完成了(暂时)!这对你问题的第二部分很重要,我稍后会详细讨论。
  4. 我们的调用结束了,所以如果我们运行以交互方式进行,那么控制流returns到REPL。如果我们 运行ning 作为脚本,解释器将到达脚本的末尾并开始关闭(我将在下面详细讨论)。但是,线程池控制的其他线程仍在 运行ning 中,并且在某些时候,其中一个线程将执行一些我们关心的事情!让我们看看那是什么。

  5. 当计划函数 (time.sleep) 完成 运行ning 时,它所在的线程 运行ning 将调用我们在 运行ning 中设置的回调=26=]对象。也就是说,它将在我们之前创建的 Task 对象上调用 Task._wakup()(我们在顶层不再有引用,但是 Future 保留了一个引用,所以它是还活着)。来看看方法:

    • result = fut.result() 存储延迟调用的结果。在这种情况下这是无关紧要的,因为我们从不查看结果(反正是 None)。
    • self.step(result) 再上一步!现在我们回到我们关心的代码。让我们看看这次它做了什么:
      • fut = self._gen.send(value) 再次发送给生成器,所以它接管了。它已经产生了一次,所以这次我们在 yield 之后开始:
        • print("Tick :", n)这个很简单
        • Task(recursive(n+1)).step() 这就是事情变得有趣的地方。这条线就像我们开始的那样。所以,和以前一样,这将 运行 我上面列出的逻辑 1-4(包括它们的子步骤)。但是当 step() 方法 returns 时,它并没有返回到 REPL 或结束脚本,而是返回到这里。
        • recursive() 生成器(原始生成器,不是我们刚刚创建的新生成器)已结束。因此,就像任何到达其代码末尾的生成器一样,它会引发 StopIteration.
      • StopIterationtry/except块捕获并忽略,step()方法结束。
    • _wakup()方法也结束了,所以回调完成。
  6. 最终,在之前的回调中创建的 Task 的回调也将被调用。所以我们返回并重复第 5 步,一遍又一遍,直到永远(如果我们 运行 以交互方式进行)。

上面的调用堆栈解释了为什么交互式案例永远打印。 REPL 的主线程 returns(如果你能看到其他线程的输出,你可以用它做其他事情)。但是在池中,每个线程都从自己作业的回调中调度另一个作业。当下一个作业完成时,它的回调会安排另一个作业,依此类推。

那么,当您 运行 将代码作为脚本时,为什么只得到 8 个打印输出?答案已在上面的第 4 步中暗示。当 运行 以非交互方式运行时,主线程 运行 在第一次调用 Task.step returns 后结束脚本。这会提示解释器尝试关闭。

concurrent.futures.thread 模块(其中定义了 ThreadPoolExecutor)有一些奇特的逻辑,当程序关闭而执行程序仍处于活动状态时,它会尝试很好地清理。它应该停止任何空闲线程,并在当前作业完成时向仍然 运行 发出停止信号。

该清理逻辑的确切实现以一种非常奇怪的方式与我们的代码交互(可能有也可能没有错误)。结果是第一个线程不断给自己更多的工作要做,而生成的其他工作线程在生成后立即退出。当执行者启动了它想要使用的线程数(在我们的例子中为 8 个)时,第一个工作人员最终退出。

这是我理解的事件顺序。

  1. 我们(间接)导入 concurrent.futures.thread 模块,它使用 atexit 告诉解释器 运行 一个名为 _python_exit 的函数,就在解释器关闭之前。
  2. 我们创建了一个最大线程数为 8 的 ThreadPoolExecutor。它不会立即生成其工作线程,但会在每次安排作业时创建一个,直到它拥有所有 8 个。
  3. 我们安排了我们的第一份工作(在前面列表中步骤 3 的深层嵌套部分)。
  4. 执行器将作业添加到其内部队列,然后注意到它没有最大数量的工作线程并启动一个新线程。
  5. 新线程将作业从队列中弹出并开始 运行 处理它。但是,sleep 调用比其余步骤花费的时间长得多,因此线程将在这里卡住一点。
  6. 主线程完成(已到达前面列表中的第 4 步)。
  7. _python_exit 函数被解释器调用,因为解释器想要关闭。该函数在模块中设置一个全局_shutdown变量,并向执行器的内部队列发送一个None(每个线程发送一个None,但只有一个线程被创建所以远,所以它只发送一个 None)。然后它会阻塞主线程,直到它知道的线程退出。这会延迟解释器关闭。
  8. 工作线程对time.sleep的调用returns。它调用在其作业 Future 中注册的回调函数,该函数会安排另一个作业。
  9. 与此列表的第 4 步一样,执行程序将作业排队,并启动另一个线程,因为它还没有所需的数量。
  10. 新线程尝试从内部队列中获取作业,但从第 7 步中获取 None 值,这是一个可能完成的信号。它看到 _shutdown 全局已设置,因此它退出。不过,在此之前,它会向队列中添加另一个 None
  11. 第一个工作线程完成回调。它寻找一份新工作,并找到它在第 8 步中自行排队的工作。它开始 运行 处理该工作,就像第 5 步一样,这需要一段时间。
  12. 虽然没有其他事情发生,因为第一个 worker 是目前唯一活动的线程(主线程被阻塞等待第一个 worker 死亡,另一个 worker 自行关闭)。
  13. 我们现在重复步骤 8-12 几次。第一个工作线程将第三个到第八个作业排队,并且执行程序每次都生成一个相应的线程,因为它没有完整的集合。但是,每个线程都会立即终止,因为它从作业队列中获得 None 而不是要完成的实际作业。第一个工作线程最终完成所有实际工作。
  14. 终于,在第 8 份工作之后,有些东西有所不同。这一次,当回调安排另一个作业时,不会产生额外的线程,因为执行程序知道它已经启动了请求的 8 个线程(它不知道 7 个线程已经关闭)。
  15. 所以这一次,位于内部作业队列头部的 None 被第一个工作人员(而不是实际作业)拾取。这意味着它会关闭,而不是做更多的工作。
  16. 当第一个 worker 关闭时,主线程(一直在等待它退出)终于可以解除阻塞并且 _python_exit 函数完成。这让解释器完全关闭。我们完成了!

这解释了我们看到的输出!我们得到 8 个输出,全部来自同一个工作线程(第一个产生)。

我认为该代码中可能存在竞争条件。如果第 11 步发生在第 10 步之前,事情可能会中断。如果第一个工人从队列中得到 None 而另一个新产生的工人得到了真正的工作,那么他们会交换角色(第一个工人会死,另一个人会做剩下的工作,除非这些步骤的更高版本中有更多竞争条件)。但是,一旦第一个 worker 死亡,主线程就会被解除阻塞。因为它不知道其他线程(因为当它列出要等待的线程时它们不存在),它会过早地关闭解释器。

我不确定这场比赛是否有可能发生。我猜这不太可能,因为新线程开始和它从队列中抓取作业之间的代码路径长度比现有线程完成回调的路径(它排队后的部分)短得多新工作)然后在队列中寻找另一个工作。

我怀疑这是一个错误,当我们 运行 我们的代码作为脚本时,ThreadPoolExecutor 让我们干净地退出。除了执行程序自己的 self._shutdown 属性之外,排队新作业的逻辑可能还应该检查全局 _shutdown 标志。如果是这样,在主线程完成后尝试排队另一个作业会引发异常。

您可以通过在 with 语句中创建 ThreadPoolExecutor 来复制我认为更明智的行为:

# create the pool below the definition of recursive()
with ThreadPoolExecutor(max_workers=8) as pool:
    Task(recursive(0)).step()

这将在主线程 returns 从 step() 调用后很快崩溃。它看起来像这样:

exception calling callback for <Future at 0x22313bd2a20 state=finished returned NoneType>
Traceback (most recent call last):
  File "S:\python36\lib\concurrent\futures\_base.py", line 324, in _invoke_callbacks
    callback(self)
  File ".\task_coroutines.py", line 21, in _wakeup
    self.step(result)
  File ".\task_coroutines.py", line 14, in step
    fut = self._gen.send(value)
  File ".\task_coroutines.py", line 30, in recursive
    Task(recursive(n+1)).step()
  File ".\task_coroutines.py", line 14, in step
    fut = self._gen.send(value)
  File ".\task_coroutines.py", line 28, in recursive
    yield pool.submit(time.sleep, 1)
  File "S:\python36\lib\concurrent\futures\thread.py", line 117, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown