如何以非阻塞方式链接期货?即,如何在不阻塞的情况下将一个未来作为另一个未来的输入?
How to chain futures in a non-blocking manner? That is, how to use one future as an input in another future without blocking?
使用下面的示例,一旦 future1
完成(不阻止 future3
提交),future2
如何使用 future1
的结果?
from concurrent.futures import ProcessPoolExecutor
import time
def wait(seconds):
time.sleep(seconds)
return seconds
pool = ProcessPoolExecutor()
s = time.time()
future1 = pool.submit(wait, 5)
future2 = pool.submit(wait, future1.result())
future3 = pool.submit(wait, 10)
time_taken = time.time() - s
print(time_taken)
这可以通过精心设计回调以在第一个操作完成后提交第二个操作来实现。遗憾的是,无法将任意未来传递给 pool.submit
,因此需要额外的步骤将两个未来绑定在一起。
这是一个可能的实现:
import concurrent.futures
def copy_future_state(source, destination):
if source.cancelled():
destination.cancel()
if not destination.set_running_or_notify_cancel():
return
exception = source.exception()
if exception is not None:
destination.set_exception(exception)
else:
result = source.result()
destination.set_result(result)
def chain(pool, future, fn):
result = concurrent.futures.Future()
def callback(_):
try:
temp = pool.submit(fn, future.result())
copy = lambda _: copy_future_state(temp, result)
temp.add_done_callback(copy)
except:
result.cancel()
raise
future.add_done_callback(callback)
return result
请注意 copy_future_state
是 asyncio.futures._set_concurrent_future_state 的略微修改版本。
用法:
from concurrent.futures import ProcessPoolExecutor
def wait(seconds):
time.sleep(seconds)
return seconds
pool = ProcessPoolExecutor()
future1 = pool.submit(wait, 5)
future2 = chain(pool, future1, wait)
future3 = pool.submit(wait, 10)
使用下面的示例,一旦 future1
完成(不阻止 future3
提交),future2
如何使用 future1
的结果?
from concurrent.futures import ProcessPoolExecutor
import time
def wait(seconds):
time.sleep(seconds)
return seconds
pool = ProcessPoolExecutor()
s = time.time()
future1 = pool.submit(wait, 5)
future2 = pool.submit(wait, future1.result())
future3 = pool.submit(wait, 10)
time_taken = time.time() - s
print(time_taken)
这可以通过精心设计回调以在第一个操作完成后提交第二个操作来实现。遗憾的是,无法将任意未来传递给 pool.submit
,因此需要额外的步骤将两个未来绑定在一起。
这是一个可能的实现:
import concurrent.futures
def copy_future_state(source, destination):
if source.cancelled():
destination.cancel()
if not destination.set_running_or_notify_cancel():
return
exception = source.exception()
if exception is not None:
destination.set_exception(exception)
else:
result = source.result()
destination.set_result(result)
def chain(pool, future, fn):
result = concurrent.futures.Future()
def callback(_):
try:
temp = pool.submit(fn, future.result())
copy = lambda _: copy_future_state(temp, result)
temp.add_done_callback(copy)
except:
result.cancel()
raise
future.add_done_callback(callback)
return result
请注意 copy_future_state
是 asyncio.futures._set_concurrent_future_state 的略微修改版本。
用法:
from concurrent.futures import ProcessPoolExecutor
def wait(seconds):
time.sleep(seconds)
return seconds
pool = ProcessPoolExecutor()
future1 = pool.submit(wait, 5)
future2 = chain(pool, future1, wait)
future3 = pool.submit(wait, 10)