从 Python、a.k.a 中的协程生成值。将回调转换为生成器

Yielding a value from a coroutine in Python, a.k.a. convert callback to generator

我是 Python 和函数式编程的新手。我正在使用版本 2.7.6

我正在使用 Tornado 框架来发出异步网络请求。根据我对函数式编程的了解,我希望我的数据通过使用生成器流过我的代码。我已经使用生成器完成了我需要的大部分工作,并在数据流通过我的函数调用时转换数据。

在我的流的最后,我想对一些数据发出 REST 请求。在我将数据提交给 Tornado 之前,我有一个 for 循环,以启动拉取,然后发送 http 请求。 Tornado 提供的 http 对象将回调函数作为一个选项,并且总是 returns 一个 Future——这实际上是一个 Tornado Future 对象,而不是官方的 Python Future。

我的问题是,由于我现在使用生成器通过我的代码提取数据,所以我不想再使用回调函数。我这样做的原因是,在我从回调中取回数据后,我的数据现在正在通过我的代码推送,我无法再使用生成器。

我的目标是创建一个如下所示的界面:

urls = (...generated urls...)
responses = fetch(urls)

其中 responses 是完整网址的生成器。

我尝试做的事情——在很多事情中——是将回调的结果转换为生成器。我正在考虑这样的事情,尽管我还远未针对我将很快解释的其他问题实施它。但是,我希望我的获取函数看起来像这样:

def fetch(urls):
    def url_generator():
        while True:
            val = yield
            yield val

    @curry
    def handler(gen, response):
        gen.send(response)

    gen = url_generator()

    for u in urls:
        http.fetch(u, callback=handler(gen))

    return gen

我简化了代码和语法以专注于问题,但我认为这会很好地工作。我的策略是定义一个 coroutine/generator,然后我会在收到回复时将其发送给它。

我最头疼的是 coroutine/generator。即使我以上述方式定义一个生成器并执行以下操作,我也会得到一个无限循环——这是我的主要问题之一。

def gen():
    while True:
        val = yield
        print 'val', val
        yield val
        print 'after', val
        break

g = gen()
g.send(None)
g.send(10)

for e in g:
    print e

这会按预期在协程中打印 val 10 after 10 和 break,但 for 循环永远不会获得值 10。当 break 出现时它不会打印任何内容。如果我删除中断,然后我得到无限循环:

val None
None
after None
None
val None
None
after None
None
...

如果我删除 for 循环,那么协程将只打印 val 10,因为它等待第二次 yield。我期待这个。但是,使用它不会产生任何结果。

同样,如果我删除 for 循环并将其替换为 print next(g),则会出现 StopIteration 错误,我认为这意味着我在没有更多值的生成器上调用了 next。

任何人,当我深入研究 Python 时,我完全不知所措。我认为这在 Python 中很常见,以至于有人知道一个很好的方法。我搜索了 'convert callback into generator' 之类的东西,但运气不佳。

另一方面,我可能会从 http 请求中产生每个未来,但我没有太多运气 "waiting" 在未来完成的收益率上。我读了很多关于 'yield from' 的内容,但它似乎是 Python 3 具体的,Tornado 似乎还没有在 Python 3 上工作。

感谢观看,感谢您提供的任何帮助。

龙卷风在 Python 3.

上效果很好

上面简化代码的问题在于它没有按照您的预期进行:

val = yield

您希望生成器在那里暂停(阻塞您的 for 循环)直到其他函数调用 g.send(value),但事实并非如此。相反,代码的行为如下:

val = yield None

因此 for 循环接收 None 值的速度与处理它们的速度一样快。它接收到每个None后,隐式调用g.next(),这与g.send(None)是一样的。因此,您的代码等同于:

def gen():
    while True:
        val = yield None
        print 'val', val
        yield val
        print 'after', val

g = gen()
g.send(None)
g.send(10)

while True:
    try:
        e = g.send(None)
        print e
    except StopIteration:
        break

阅读这个版本的代码,其中的隐式行为是显式的,我希望它清楚为什么它只是在无限循环中生成 None

您需要的是一种方法,让一个函数将项目添加到队列的头部,而另一个函数阻止等待项目,并在它们准备好时将它们从队列的尾部拉出。从 Tornado 4.2 开始,我们有了:

http://www.tornadoweb.org/en/stable/queues.html

web spider的例子和你想做的很接近,我相信你可以适应它:

http://www.tornadoweb.org/en/stable/guide/queues.html