如何在回调中获得未来的结果?

How to get the result of a future in a callback?

最近将 add_done_callback 方法添加到分布式 Future 对象中,它允许您在未来完成后采取一些行动,无论它是否成功。

http://distributed.readthedocs.io/en/latest/api.html?highlight=add_done_callback#distributed.client.Future.add_done_callback

如果您尝试在传递的未来对象上直接调用任何方法 resultexceptiontraceback,回调函数将挂起。

异常和回溯可以在回调中访问,如下所示: fut._exception().result() fut._traceback().result()

对结果尝试相同的模式 - 即 fut._result().result() 引发异常:

  File "C:\Python\lib\site-packages\tornado\concurrent.py", line 316, in _check_done
    raise Exception("DummyFuture does not support blocking for results")
Exception: DummyFuture does not support blocking for results

无法在回调中访问未来的结果,能够添加回调对我来说用处有限。

我是不是遗漏了什么 - 有没有办法在回调中获得未来的结果?

在 asyncio 文档中,它似乎给出了一个可以直接访问 result 方法的示例:

https://docs.python.org/3/library/asyncio-task.html#example-future-with-run-forever

...我不确定这与 tornado/distributed 有什么关系,但如果能够做到这一点,非常 很有用。

from distributed import Client


client = Client("127.0.0.1:8786")

def f(delay):
    from time import sleep
    from numpy.random import randn
    sleep(delay)
    if randn() > 1:
        1/0
    return delay

def callback(fut):
    import logging
    logger = logging.getLogger('distributed')
    if fut.status == 'finished':
        res = future._result().result()  # <-------------- Doesn't work!
        logger.info("{!r} - {!s}".format(fut, res))
    else:
        logger.info("{!r} - {!s}".format(fut, fut.status))


args = rand(10)
futs = client.map(f, args)
for fut in futs:
    fut.add_done_callback(callback)

目前您的回调在 Tornado 事件循环中被调用。如果你想获得未来的结果,你将不得不使用 Tornado API.

这是一个最小的例子:

In [1]: from distributed import Client
In [2]: client = Client()
In [3]: def inc(x):
   ...:     return x + 1
   ...: 
In [4]: from tornado import gen

In [5]: @gen.coroutine
   ...: def callback(future):
   ...:     result = yield future._result()
   ...:     print(result * 10)
   ...:     
In [6]: future = client.submit(inc, 1)

In [7]: future.add_done_callback(callback)

20

但是,您的问题强调,这可能不是用户与 add_done_callback 交互的最直观方式,因此如果我们在更高版本中引入重大更改,我不会感到惊讶。

In [8]: import distributed

In [8]: distributed.__version__
Out[8]: '1.14.0'