如何连接持续生产和使用数据的 asyncio.coroutines?

How do I connect asyncio.coroutines that continually produce and consume data?

我正在尝试学习如何(惯用地)使用 Python 3.4 的 asyncio。我最大的绊脚石是如何 "chain" 持续消耗数据的协程,用它更新状态,并允许另一个协程使用该状态。

我期望从这个示例程序中观察到的行为只是定期报告从子进程接收到的数字总和。报告的发生率应该与 Source 对象从子进程接收数字的速度大致相同。报告功能中的 IO 阻塞不应阻止从子进程读取。如果报告函数阻塞的时间比从子进程读取的迭代时间长,我不在乎它是跳过还是一次报告一堆;但是在足够长的时间范围内,reporter() 的迭代次数应该与 expect_exact() 的迭代次数差不多。

#!/usr/bin/python3
import asyncio
import pexpect

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        self.sum = 0

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            self.sum += int(p.before)

        p.terminate()

@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source # ???
        print("New sum is: {:d}".format(new_sum))
        # Potentially some other blocking operation
        yield from limited_throughput.write(new_sum)

def main():
    loop = asyncio.get_event_loop()

    source = Source()
    loop.call_later(1, source.start)
    loop.call_later(11, source.stop)

    # Again, not sure what goes here...
    asyncio.async(reporter(source))

    loop.run_until_complete(source.run())
    loop.close()

if __name__ == '__main__':
    main()

此示例需要从 git 安装 pexpect;您可以轻松地将 run() 替换为:

@asyncio.coroutine
def run(self):
    yield from self.flag.wait()

    while self.flag.is_set():
        value = yield from asyncio.sleep(0.5, random.choice((-1, 1)))
        self.sum += value

但我感兴趣的真正子流程需要是 pty 中的 运行,我认为这意味着 asyncio 中提供的子流程 transport/protocol 框架获胜这已经足够了。关键是异步activity的来源是一个可以与yield from.

一起使用的协程

请注意,此示例中的 reporter() 函数不是有效代码;我的问题是我不知道那里应该放什么。理想情况下,我希望将 reporter() 代码与 run() 分开;本练习的目的是了解如何使用 asyncio.

中的组件将更复杂的程序分解为更小的代码单元

有没有办法用 asyncio 模块构建这种行为?

asyncio 中的锁定原语和队列本身提供了一些执行此操作的机制。

条件

asyncio.Condition() 提供了一种通知条件的方法。如果您删除一些事件并不重要,请使用它。

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        self.sum = 0

        # For consumers
        self.ready = asyncio.Condition()

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            self.sum += int(p.before)
            with (yield from self.ready):
                self.ready.notify_all() # Or just notify() depending on situation

        p.terminate()

    @asyncio.coroutine
    def read(self):
        with (yield from self.ready):
            yield from self.ready.wait()
            return self.sum


@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source.read()
        print("New sum is: {:d}".format(new_sum))
        # Other potentially blocking stuff in here

队列

asyncio.Queue() 允许您将数据放入队列(后进先出或先进先出)并从中读取其他内容。如果您绝对想响应每个事件,请使用此选项,即使您的消费者(及时)落后了。请注意,如果您限制队列的大小,如果您的消费者足够慢,您的生产者最终会阻塞。

请注意,这也允许我们将 sum 转换为局部变量。

#!/usr/bin/python3
import asyncio
import pexpect

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        # NOTE: self.sum removed!

        # For consumers
        self.output = asyncio.Queue()

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        sum = 0

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            sum += int(p.before)
            yield from self.output.put(sum)

        p.terminate()

    @asyncio.coroutine
    def read(self):
        return (yield from self.output.get())

@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source.read()
        print("New sum is: {:d}".format(new_sum))
        # Other potentially blocking stuff here

请注意,Python 3.4.4 将 task_done()join() 方法添加到 Queue,以允许您在知道消费者是完成(如适用)。