Python 线程池 - 创建子任务并等待它们的任务

Python thread pools - tasks that create subtasks and wait on them

假设我有一个最大线程池执行器。 10 个线程,我向它提交一个任务,它本身创建另一个任务,然后等待它完成,递归地直到我达到 11 的深度。

Python中的示例代码:

import concurrent.futures

e = concurrent.futures.ThreadPoolExecutor(max_workers=10)

def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        f = e.submit(task, depth + 1)
        concurrent.futures.wait([f])


f = e.submit(task, 0)
print f.result()

以上代码输出:

started depth 0
started depth 1
started depth 2
started depth 3
started depth 4
started depth 5
started depth 6
started depth 7
started depth 8
started depth 9

和死锁。

有什么方法可以在不创建额外的线程和执行器的情况下解决这个问题吗?

换句话说,工作线程在等待时处理其他任务的方法?

不,如果你想避免死锁,你不能等待任务中同一个执行者的未来。

在此示例中,您唯一可以做的就是 return 未来,然后递归处理结果:

import concurrent.futures
import time

e = concurrent.futures.ThreadPoolExecutor(max_workers=10)

def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        f = e.submit(task, depth + 1)
        return f


f = e.submit(task, 0)
while isinstance(f.result(), concurrent.futures.Future):
    f = f.result()

print f.result()

然而,最好首先避免这种递归执行。

您在这里所经历的,就是您已经正确称之为 deadlock 的事情。启动下一个线程并等待它的第一个线程持有一个 lock ,所有后续任务都将死锁,同时等待相同的 lock 被释放(这在你的情况下永远不会)。我建议您在任务中启动自己的线程而不是使用池,例如:

import concurrent.futures
import threading


class TaskWrapper(threading.Thread):

    def __init__(self, depth, *args, **kwargs):
        self._depth = depth
        self._result = None
        super(TaskWrapper, self).__init__(*args, **kwargs)

    def run(self):
        self._result = task(self._depth)

    def get(self):
        self.join()
        return self._result

e = concurrent.futures.ThreadPoolExecutor(max_workers=10)


def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        t = TaskWrapper(depth + 1)
        t.start()
        return t.get()

f = e.submit(task, 0)
print f.result()

使用协同程序,您的代码可以重写为:

import asyncio

@asyncio.coroutine
def task(depth):
    print('started depth %d' % (depth, ))
    if depth > 10:
        return depth
    else:
        # create new task
        t = asyncio.async(task(depth + 1))
        # wait for task to complete
        yield from t
        # get the result of the task
        return t.result()

loop = asyncio.get_event_loop()
result = loop.run_until_complete(task(1))
print(result)
loop.close()

但是,我很难理解为什么您需要所有这些额外的代码。在您的示例代码中,您总是直接等待任务的结果,因此如果没有执行程序,您的代码将 运行 没有什么不同。例如,以下将产生相同的结果

def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        task(depth + 1)

我认为文档中的这个示例更好地展示了异步协程如何能够并行执行任务。此示例创建 3 个任务,每个任务计算不同的阶乘。请注意当每个任务让步给另一个协程时(在本例中 async.sleep),另一个任务如何被允许继续执行。

import asyncio

@asyncio.coroutine
def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        yield from asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

loop = asyncio.get_event_loop()
tasks = [
    asyncio.ensure_future(factorial("A", 2)),
    asyncio.ensure_future(factorial("B", 3)),
    asyncio.ensure_future(factorial("C", 4))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

输出:

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24