Python asyncio及时处理事件

Python asyncio timely event handling

我正在使用 python 的 asyncio 库,在 this example 之后,我编写了以下脚本:

# file: get_rand.py
from random import choice
from time import sleep
import sys

def main():
    sys.stderr.write('child: starting loop...\n')
    for _ in range(5):
        print(choice('abcdefghijklmnopqrstuvwxyz'))
        sys.stderr.write('child: going to sleep\n')
        sleep(0.5)

if __name__ == '__main__':
    main()

和:

# file: async_test.py
import asyncio
import time


class Protocol(asyncio.SubprocessProtocol):

    def __init__(self, exit_f):
        self.exit = exit_f
        print('Protocol initialised')

    def pipe_data_received(self, fd, data):
        print('Data received')
        if fd == 1:
            with open('rand_file.txt', 'a') as out:
                out.write(bytes(data).decode('ascii'))
        elif fd == 2:
            print('Received error data!')
            print(data)

    def pipe_connection_lost(self, fd, exc):
        print('Pipe connection lost')
        if exc is not None:
            print(exc)
            raise exc

    def process_exited(self):
        self.exit.set_result(True)
        print('Subprocess exited')


@asyncio.coroutine
def mycoro():
    loop = asyncio.get_event_loop()
    exit_future = asyncio.Future(loop=loop)
    print('creating process...')
    subprocess = loop.subprocess_exec(lambda: Protocol(exit_future),
                                      'python3.5', 'get_rand.py',
                                      stdin=None, stderr=None)
    transp, proto = yield from subprocess
    print('waiting for subprocess to finish...')
    yield from exit_future
    transp.close()


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(mycoro())
    loop.close()

执行此代码时,我得到以下信息:

$ python3.5 async_test.py 
creating process...
Protocol initialised
waiting for subprocess to finish...
child: starting loop...
child: going to sleep 
child: going to sleep 
child: going to sleep 
child: going to sleep 
child: going to sleep 
Data received
Pipe connection lost
Subprocess exited

关于这一切我有很多问题:

  1. Apparent只有 child 发送的数据仅在 child 终止后触发 pipe_data_received 事件一次。有没有办法生成 child 进程并在每次写入 stdout 时触发 pipe_data_received 事件?
  2. 如果我删除行 transp, proto = yield from subprocess 整个事情就挂在 creating process... 上,所以看起来 child 直到 parent 才开始 transp, proto = yield from subprocess。那是对的吗?这是为什么?
  3. 如果我希望我的进程产生一个 child,它永远运行并定期触发 pipe_data_received,同时启动进程保持其执行流程,并做其他事情怎么办?这是满足这种需求的正确工具吗?

1.

print 将数据写入标准输出缓冲区,默认情况下它们只被刷新一次。您可以添加显式 flush.

for _ in range(5):
   print(choice('abcdefghijklmnopqrstuvwxyz'))
   sys.stdout.flush()

或 ptyhon3.3 及更高版本

for _ in range(5):
    print(choice('abcdefghijklmnopqrstuvwxyz'), flush=True)

更多信息How to flush output of Python print?

2.

subprocess_exec returns 协程。您想要 运行 的每个协程都必须安排在循环中。 yield from 只是安排它并等待它完成(因为 subprocess_exec 完成意味着流程已执行)。

3.

对于后台的运行任务,您还必须循环安排它,但不要等待结果。您可以使用 ensure_future`.

@asyncio.coroutine
def mycoro():
    loop = asyncio.get_event_loop()
    exit_future = asyncio.Future(loop=loop)
    print('creating process...')
    subprocess = loop.subprocess_exec(lambda: Protocol(exit_future),
                                      'python3.5', 'get_rand.py',
                                      stdin=None, stderr=None)
    task = asyncio.ensure_future(subprocess)
    print('Subprocess is handled in the background task')

    # this function is called with run_until_complete, 
    # since that returning means complete we would not
    # finish subprocess task
    # so im leaving it
    yield from exit_future

编辑

这里是 运行ning 循环的简单示例。我删除了所有 exit_future 相关内容,因为不需要。

import asyncio
import time


class Protocol(asyncio.Protocol):

    def __init__(self):
        print('Protocol initialised')

    def pipe_data_received(self, fd, data):
        print('Data received %s' % data)
        if fd == 1:

            with open('rand_file.txt', 'a') as out:
                out.write(bytes(data).decode('ascii'))
        elif fd == 2:
            print('Received error data!')
            print(data)


    def pipe_connection_lost(self, fd, exc):
        print('Pipe connection lost')
        if exc is not None:
            print(exc)
            raise exc

    def process_exited(self):
        print('Subprocess exited')


@asyncio.coroutine
def mycoro():
    loop = asyncio.get_event_loop()
    print('creating process...')
    subprocess = loop.subprocess_exec(lambda: Protocol(),
                                      'python3.5', 'get_rand.py',
                                      stdin=None, stderr=None)
    asyncio.ensure_future(subprocess)
    asyncio.ensure_future(dummy_work())
    print('Mycoro finished, tasks are scheduled')


@asyncio.coroutine
def dummy_work():
    while True:
        yield from asyncio.sleep(1)
        print('dummy work')

def main():
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(mycoro())
    loop.run_forever()
    loop.close()

main()


def main():
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(mycoro())
    loop.run_forever()
    loop.close()

main()