如何在功能上组合期货?
How to functionally compose futures?
我有一个无法跨 ProcessPoolExecutor 分配的线程对象,但希望 return 有一个未来。如果我已经有了未来,有没有办法应用到它的完成值,例如,Future a -> (a -> b) -> Future b
?
import concurrent.futures
import threading
def three(x):
return 2+x
if __name__ == '__main__':
trackedItem = (3, threading.Event())
pool = concurrent.futures.ProcessPoolExecutor(3)
poolJob = (q.submit(three, trackedItem[0]),trackedItem[1]) #(Future(int), Event)
*** something magic goes here ***
#Trying to transform it into Future(int,Event)
这是一种使用更简单的设置代码的方法,没有 threading.Event
因为这似乎不是解决问题所必需的。基本上,您可以自己创建 future_b
作为一个新的 Future()
,然后在 future_a
上使用 add_done_callback
方法来设置 future_b
的结果。这里,func_a
是计算future_a
的结果的计算,func_b
是使用future_a
的结果计算future_b
的结果的计算。
from concurrent.futures import ProcessPoolExecutor, Future
def func_a(x):
return 2 + x
def func_b(x):
return 10 * x
if __name__ == '__main__':
pool = ProcessPoolExecutor(3)
future_a = pool.submit(func_a, 3)
future_b = Future()
future_b.set_running_or_notify_cancel()
def callback(f):
x = f.result()
y = func_b(x)
future_b.set_result(y)
future_a.add_done_callback(callback)
print(future_b.result()) # 50
如果你想要一个辅助函数来做这个,你可以写一个:map_future
需要一个未来和一个映射函数,并且 returns 根据需要新映射的未来。此版本处理异常,以防 f.result()
或 func_b
抛出一个:
def map_future(future_a, func):
future_b = Future()
future_b.set_running_or_notify_cancel()
def callback(f):
try:
x = f.result()
y = func(x)
future_b.set_result(y)
except Exception as e:
future_b.set_exception(e)
future_a.add_done_callback(callback)
return future_b
注意事项:这违背了 Future
class 文档中的建议,它说:
Future
instances are created by Executor.submit()
and should not be created directly except for testing.
此外,如果您在回调中有任何不是 Exception
的子 class 的错误,根据文档,它们将是 "logged and ignored"。为简单起见,我选择在此代码中仅捕获 Exception
,但您可能更喜欢 sys.exc_info()[0]
捕获所有可能引发的事情的方式。
@kaya3 提供了一个很好的答案,但是我 运行 在为它添加异常处理以关闭池时遇到了问题。您可以在下面找到我的示例 cpchung_example
,了解如何在功能上组合未来。还是要加异常处理,目前还没有很好的解决办法
为了比较,我把它们都放在一个文件里了:
from concurrent.futures import ProcessPoolExecutor, Future
from concurrent.futures.thread import ThreadPoolExecutor
def map_future(future_a, func):
future_b = Future()
future_b.set_running_or_notify_cancel()
def callback(f):
try:
x = f.result()
y = func(x)
future_b.set_result(y)
except Exception as e:
future_b.set_exception(e)
future_a.add_done_callback(callback)
return future_b
def func_a(x):
return 2 + x
def func_b(x):
return 3 * x
def func_c(x):
raise NameError('Hi There')
return 4 * x
def kaya3_example():
future_a = pool.submit(func_a, 3)
future_b = Future()
future_b.set_running_or_notify_cancel()
def callback(f):
x = f.result()
y = func_b(x)
future_b.set_result(y)
future_a.add_done_callback(callback)
print(future_b.result()) # 50
def exception_handling():
try:
future_a = pool.submit(func_a, 3)
future_b = map_future(future_a, func_b)
future_c = map_future(future_b, func_c)
print(future_c.result())
except Exception as e:
pool.shutdown()
pool.shutdown()
def f(x, y):
return x * y
def cpchung_example():
with ThreadPoolExecutor(max_workers=1) as executor:
a = executor.submit(f, 2, 3)
b = executor.submit(f, 4, 5)
c = executor.submit(f, a.result(), b.result())
print(c.result())
if __name__ == '__main__':
pool = ProcessPoolExecutor(3)
kaya3_example()
cpchung_example()
# exception_handling() # not working, still wip
我有一个无法跨 ProcessPoolExecutor 分配的线程对象,但希望 return 有一个未来。如果我已经有了未来,有没有办法应用到它的完成值,例如,Future a -> (a -> b) -> Future b
?
import concurrent.futures
import threading
def three(x):
return 2+x
if __name__ == '__main__':
trackedItem = (3, threading.Event())
pool = concurrent.futures.ProcessPoolExecutor(3)
poolJob = (q.submit(three, trackedItem[0]),trackedItem[1]) #(Future(int), Event)
*** something magic goes here ***
#Trying to transform it into Future(int,Event)
这是一种使用更简单的设置代码的方法,没有 threading.Event
因为这似乎不是解决问题所必需的。基本上,您可以自己创建 future_b
作为一个新的 Future()
,然后在 future_a
上使用 add_done_callback
方法来设置 future_b
的结果。这里,func_a
是计算future_a
的结果的计算,func_b
是使用future_a
的结果计算future_b
的结果的计算。
from concurrent.futures import ProcessPoolExecutor, Future
def func_a(x):
return 2 + x
def func_b(x):
return 10 * x
if __name__ == '__main__':
pool = ProcessPoolExecutor(3)
future_a = pool.submit(func_a, 3)
future_b = Future()
future_b.set_running_or_notify_cancel()
def callback(f):
x = f.result()
y = func_b(x)
future_b.set_result(y)
future_a.add_done_callback(callback)
print(future_b.result()) # 50
如果你想要一个辅助函数来做这个,你可以写一个:map_future
需要一个未来和一个映射函数,并且 returns 根据需要新映射的未来。此版本处理异常,以防 f.result()
或 func_b
抛出一个:
def map_future(future_a, func):
future_b = Future()
future_b.set_running_or_notify_cancel()
def callback(f):
try:
x = f.result()
y = func(x)
future_b.set_result(y)
except Exception as e:
future_b.set_exception(e)
future_a.add_done_callback(callback)
return future_b
注意事项:这违背了 Future
class 文档中的建议,它说:
Future
instances are created byExecutor.submit()
and should not be created directly except for testing.
此外,如果您在回调中有任何不是 Exception
的子 class 的错误,根据文档,它们将是 "logged and ignored"。为简单起见,我选择在此代码中仅捕获 Exception
,但您可能更喜欢 sys.exc_info()[0]
捕获所有可能引发的事情的方式。
@kaya3 提供了一个很好的答案,但是我 运行 在为它添加异常处理以关闭池时遇到了问题。您可以在下面找到我的示例 cpchung_example
,了解如何在功能上组合未来。还是要加异常处理,目前还没有很好的解决办法
为了比较,我把它们都放在一个文件里了:
from concurrent.futures import ProcessPoolExecutor, Future
from concurrent.futures.thread import ThreadPoolExecutor
def map_future(future_a, func):
future_b = Future()
future_b.set_running_or_notify_cancel()
def callback(f):
try:
x = f.result()
y = func(x)
future_b.set_result(y)
except Exception as e:
future_b.set_exception(e)
future_a.add_done_callback(callback)
return future_b
def func_a(x):
return 2 + x
def func_b(x):
return 3 * x
def func_c(x):
raise NameError('Hi There')
return 4 * x
def kaya3_example():
future_a = pool.submit(func_a, 3)
future_b = Future()
future_b.set_running_or_notify_cancel()
def callback(f):
x = f.result()
y = func_b(x)
future_b.set_result(y)
future_a.add_done_callback(callback)
print(future_b.result()) # 50
def exception_handling():
try:
future_a = pool.submit(func_a, 3)
future_b = map_future(future_a, func_b)
future_c = map_future(future_b, func_c)
print(future_c.result())
except Exception as e:
pool.shutdown()
pool.shutdown()
def f(x, y):
return x * y
def cpchung_example():
with ThreadPoolExecutor(max_workers=1) as executor:
a = executor.submit(f, 2, 3)
b = executor.submit(f, 4, 5)
c = executor.submit(f, a.result(), b.result())
print(c.result())
if __name__ == '__main__':
pool = ProcessPoolExecutor(3)
kaya3_example()
cpchung_example()
# exception_handling() # not working, still wip