如何在功能上组合期货?

How to functionally compose futures?

我有一个无法跨 ProcessPoolExecutor 分配的线程对象,但希望 return 有一个未来。如果我已经有了未来,有没有办法应用到它的完成值,例如,Future a -> (a -> b) -> Future b?

import concurrent.futures
import threading

def three(x):
    return 2+x


if __name__ == '__main__':
    trackedItem = (3, threading.Event())
    pool = concurrent.futures.ProcessPoolExecutor(3)
    poolJob = (q.submit(three, trackedItem[0]),trackedItem[1]) #(Future(int), Event)
    *** something magic goes here ***
    #Trying to transform it into Future(int,Event)

这是一种使用更简单的设置代码的方法,没有 threading.Event 因为这似乎不是解决问题所必需的。基本上,您可以自己创建 future_b 作为一个新的 Future(),然后在 future_a 上使用 add_done_callback 方法来设置 future_b 的结果。这里,func_a是计算future_a的结果的计算,func_b是使用future_a的结果计算future_b的结果的计算。

from concurrent.futures import ProcessPoolExecutor, Future

def func_a(x):
    return 2 + x

def func_b(x):
    return 10 * x

if __name__ == '__main__':
    pool = ProcessPoolExecutor(3)
    future_a = pool.submit(func_a, 3)

    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        x = f.result()
        y = func_b(x)
        future_b.set_result(y)

    future_a.add_done_callback(callback)

    print(future_b.result()) # 50

如果你想要一个辅助函数来做这个,你可以写一个:map_future 需要一个未来和一个映射函数,并且 returns 根据需要新映射的未来。此版本处理异常,以防 f.result()func_b 抛出一个:

def map_future(future_a, func):
    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        try:
            x = f.result()
            y = func(x)
            future_b.set_result(y)
        except Exception as e:
            future_b.set_exception(e)

    future_a.add_done_callback(callback)
    return future_b

注意事项:这违背了 Future class 文档中的建议,它说:

Future instances are created by Executor.submit() and should not be created directly except for testing.

此外,如果您在回调中有任何不是 Exception 的子 class 的错误,根据文档,它们将是 "logged and ignored"。为简单起见,我选择在此代码中仅捕获 Exception,但您可能更喜欢 sys.exc_info()[0] 捕获所有可能引发的事情的方式。

@kaya3 提供了一个很好的答案,但是我 运行 在为它添加异常处理以关闭池时遇到了问题。您可以在下面找到我的示例 cpchung_example,了解如何在功能上组合未来。还是要加异常处理,目前还没有很好的解决办法

为了比较,我把它们都放在一个文件里了:



from concurrent.futures import ProcessPoolExecutor, Future
from concurrent.futures.thread import ThreadPoolExecutor


def map_future(future_a, func):
    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        try:
            x = f.result()
            y = func(x)
            future_b.set_result(y)
        except Exception as e:

            future_b.set_exception(e)

    future_a.add_done_callback(callback)
    return future_b


def func_a(x):
    return 2 + x


def func_b(x):
    return 3 * x


def func_c(x):
    raise NameError('Hi There')
    return 4 * x


def kaya3_example():
    future_a = pool.submit(func_a, 3)

    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        x = f.result()
        y = func_b(x)
        future_b.set_result(y)

    future_a.add_done_callback(callback)

    print(future_b.result())  # 50


def exception_handling():
    try:

        future_a = pool.submit(func_a, 3)
        future_b = map_future(future_a, func_b)
        future_c = map_future(future_b, func_c)

        print(future_c.result())

    except Exception as e:
        pool.shutdown()

    pool.shutdown()


def f(x, y):
    return x * y


def cpchung_example():
    with ThreadPoolExecutor(max_workers=1) as executor:
        a = executor.submit(f, 2, 3)
        b = executor.submit(f, 4, 5)
        c = executor.submit(f, a.result(), b.result())

        print(c.result())


if __name__ == '__main__':
    pool = ProcessPoolExecutor(3)

    kaya3_example()

    cpchung_example()

    # exception_handling()  # not working, still wip