如何以非阻塞方式链接期货?即,如何在不阻塞的情况下将一个未来作为另一个未来的输入?

How to chain futures in a non-blocking manner? That is, how to use one future as an input in another future without blocking?

使用下面的示例,一旦 future1 完成(不阻止 future3 提交),future2 如何使用 future1 的结果?

from concurrent.futures import ProcessPoolExecutor
import time

def wait(seconds):
    time.sleep(seconds)
    return seconds

pool = ProcessPoolExecutor()

s = time.time()
future1 = pool.submit(wait, 5)
future2 = pool.submit(wait, future1.result())
future3 = pool.submit(wait, 10)

time_taken = time.time() - s
print(time_taken)

这可以通过精心设计回调以在第一个操作完成后提交第二个操作来实现。遗憾的是,无法将任意未来传递给 pool.submit,因此需要额外的步骤将两个未来绑定在一起。

这是一个可能的实现:

import concurrent.futures

def copy_future_state(source, destination):
    if source.cancelled():
        destination.cancel()
    if not destination.set_running_or_notify_cancel():
        return
    exception = source.exception()
    if exception is not None:
        destination.set_exception(exception)
    else:
        result = source.result()
        destination.set_result(result)


def chain(pool, future, fn):
    result = concurrent.futures.Future()

    def callback(_):
        try:
            temp = pool.submit(fn, future.result())
            copy = lambda _: copy_future_state(temp, result)
            temp.add_done_callback(copy)
        except:
            result.cancel()
            raise

    future.add_done_callback(callback)
    return result

请注意 copy_future_stateasyncio.futures._set_concurrent_future_state 的略微修改版本。

用法:

from concurrent.futures import ProcessPoolExecutor

def wait(seconds):
    time.sleep(seconds)
    return seconds

pool = ProcessPoolExecutor()
future1 = pool.submit(wait, 5)
future2 = chain(pool, future1, wait)
future3 = pool.submit(wait, 10)