在 asyncio(和观察者模式)中链接协程

chaining coroutines in asyncio (and observer pattern)

我无法理解协程是如何链接在一起的。在一个比 hello world 或 factorials 稍微简单的例子中,我想要一个循环,它不断地监视文件修改时间,然后在文件被触摸时打印出时间:

#!/usr/bin/env python3
import os
import asyncio

@asyncio.coroutine
def pathmonitor(path):
    modtime = os.path.getmtime(path)
    while True:
        new_time = os.path.getmtime(path)
        if new_time != modtime:
            modtime = new_time
            yield modtime
        yield from asyncio.sleep(1)

@asyncio.coroutine
def printer():
    while True:
        modtime = yield from pathmonitor('/home/users/gnr/tempfile')
        print(modtime)

loop = asyncio.get_event_loop()
loop.run_until_complete(printer())
loop.run_forever()

我希望它能工作 - 但是,当我 运行 它时,我得到一个:

RuntimeError: Task got bad yield: 1426449327.2590399

我做错了什么?

更新:请参阅下面我的回答,了解观察者模式的示例(即,在不使用回调(您必须使用任务)的情况下,有效地允许多个注册者在文件被访问时获取更新)。

UPDATE2:对此有更好的修复:3.5 的 async for(异步迭代器):https://www.python.org/dev/peps/pep-0492/

我通过在链式协程中使用 return 而不是 yield 让你的代码正常工作,就像 chained coroutines example:

#!/usr/bin/env python3
import os
import asyncio2

@asyncio.coroutine
def pathmonitor(path):
    modtime = os.path.getmtime(path)
    while True:
        new_time = os.path.getmtime(path)
        if new_time != modtime:
            modtime = new_time
            return modtime
        yield from asyncio.sleep(1)


@asyncio.coroutine
def printer():
    while True:
        modtime = yield from pathmonitor('/tmp/foo.txt')
        print(modtime)


loop = asyncio.get_event_loop()
loop.run_until_complete(printer())
loop.run_forever()

请注意,printer() 的循环将为每次迭代创建一个新的 pathmonitor 生成器。不确定这是否是您的初衷,但这可能是一个开始。

我发现协同程序 API 和语法让我自己有点困惑。以下是一些我认为有帮助的读物:

正如其他人所指出的,我的错误是我试图像生成器一样使用协程。我不需要依赖生成器进行迭代,而是需要创建多个协程。另外,我需要使用任务来实现没有回调的观察者模式,因为多个注册者可以 yield from 同一个任务。我的路径监视器看起来像这样:

import os
import asyncio

class PathInfo:

    def __init__(self, path):
        self.path = path
        self.modtime = os.path.getmtime(path)
        self.startTask()

    def startTask(self):
        self.task = asyncio.async(self._checkIfTouched())

    def _checkIfTouched(self):
        while True:
            yield from asyncio.sleep(1)
            newtime = os.path.getmtime(self.path)
            if self.modtime != newtime:
                self.modtime = newtime
                return newtime

class PathMonitor:

    def __init__(self):
        self._info = {}

    @asyncio.coroutine
    def wasTouched(self, path):
        try:
            info = self._info[path]
        except KeyError:
            self._info[path] = info = PathInfo(path)
        if info.task.done():
            info.startTask()
        modtime = yield from info.task
        return modtime

def printer():
    while True:
        modtime = yield from mon.wasTouched('/tmp/myfile')
        print(modtime)

mon = PathMonitor()

loop = asyncio.get_event_loop()
asyncio.async(printer())
loop.run_forever()