Python asyncio及时处理事件
Python asyncio timely event handling
我正在使用 python 的 asyncio 库,在 this example 之后,我编写了以下脚本:
# file: get_rand.py
from random import choice
from time import sleep
import sys
def main():
sys.stderr.write('child: starting loop...\n')
for _ in range(5):
print(choice('abcdefghijklmnopqrstuvwxyz'))
sys.stderr.write('child: going to sleep\n')
sleep(0.5)
if __name__ == '__main__':
main()
和:
# file: async_test.py
import asyncio
import time
class Protocol(asyncio.SubprocessProtocol):
def __init__(self, exit_f):
self.exit = exit_f
print('Protocol initialised')
def pipe_data_received(self, fd, data):
print('Data received')
if fd == 1:
with open('rand_file.txt', 'a') as out:
out.write(bytes(data).decode('ascii'))
elif fd == 2:
print('Received error data!')
print(data)
def pipe_connection_lost(self, fd, exc):
print('Pipe connection lost')
if exc is not None:
print(exc)
raise exc
def process_exited(self):
self.exit.set_result(True)
print('Subprocess exited')
@asyncio.coroutine
def mycoro():
loop = asyncio.get_event_loop()
exit_future = asyncio.Future(loop=loop)
print('creating process...')
subprocess = loop.subprocess_exec(lambda: Protocol(exit_future),
'python3.5', 'get_rand.py',
stdin=None, stderr=None)
transp, proto = yield from subprocess
print('waiting for subprocess to finish...')
yield from exit_future
transp.close()
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(mycoro())
loop.close()
执行此代码时,我得到以下信息:
$ python3.5 async_test.py
creating process...
Protocol initialised
waiting for subprocess to finish...
child: starting loop...
child: going to sleep
child: going to sleep
child: going to sleep
child: going to sleep
child: going to sleep
Data received
Pipe connection lost
Subprocess exited
关于这一切我有很多问题:
- Apparent只有 child 发送的数据仅在 child 终止后触发 pipe_data_received 事件一次。有没有办法生成 child 进程并在每次写入 stdout 时触发 pipe_data_received 事件?
- 如果我删除行
transp, proto = yield from subprocess
整个事情就挂在 creating process...
上,所以看起来 child 直到 parent 才开始 transp, proto = yield from subprocess
。那是对的吗?这是为什么?
- 如果我希望我的进程产生一个 child,它永远运行并定期触发 pipe_data_received,同时启动进程保持其执行流程,并做其他事情怎么办?这是满足这种需求的正确工具吗?
1.
print
将数据写入标准输出缓冲区,默认情况下它们只被刷新一次。您可以添加显式 flush
.
for _ in range(5):
print(choice('abcdefghijklmnopqrstuvwxyz'))
sys.stdout.flush()
或 ptyhon3.3 及更高版本
for _ in range(5):
print(choice('abcdefghijklmnopqrstuvwxyz'), flush=True)
更多信息How to flush output of Python print?。
2.
subprocess_exec
returns 协程。您想要 运行 的每个协程都必须安排在循环中。 yield from
只是安排它并等待它完成(因为 subprocess_exec
完成意味着流程已执行)。
3.
对于后台的运行任务,您还必须循环安排它,但不要等待结果。您可以使用 ensure_future`.
@asyncio.coroutine
def mycoro():
loop = asyncio.get_event_loop()
exit_future = asyncio.Future(loop=loop)
print('creating process...')
subprocess = loop.subprocess_exec(lambda: Protocol(exit_future),
'python3.5', 'get_rand.py',
stdin=None, stderr=None)
task = asyncio.ensure_future(subprocess)
print('Subprocess is handled in the background task')
# this function is called with run_until_complete,
# since that returning means complete we would not
# finish subprocess task
# so im leaving it
yield from exit_future
编辑
这里是 运行ning 循环的简单示例。我删除了所有 exit_future
相关内容,因为不需要。
import asyncio
import time
class Protocol(asyncio.Protocol):
def __init__(self):
print('Protocol initialised')
def pipe_data_received(self, fd, data):
print('Data received %s' % data)
if fd == 1:
with open('rand_file.txt', 'a') as out:
out.write(bytes(data).decode('ascii'))
elif fd == 2:
print('Received error data!')
print(data)
def pipe_connection_lost(self, fd, exc):
print('Pipe connection lost')
if exc is not None:
print(exc)
raise exc
def process_exited(self):
print('Subprocess exited')
@asyncio.coroutine
def mycoro():
loop = asyncio.get_event_loop()
print('creating process...')
subprocess = loop.subprocess_exec(lambda: Protocol(),
'python3.5', 'get_rand.py',
stdin=None, stderr=None)
asyncio.ensure_future(subprocess)
asyncio.ensure_future(dummy_work())
print('Mycoro finished, tasks are scheduled')
@asyncio.coroutine
def dummy_work():
while True:
yield from asyncio.sleep(1)
print('dummy work')
def main():
loop = asyncio.get_event_loop()
asyncio.ensure_future(mycoro())
loop.run_forever()
loop.close()
main()
def main():
loop = asyncio.get_event_loop()
asyncio.ensure_future(mycoro())
loop.run_forever()
loop.close()
main()
我正在使用 python 的 asyncio 库,在 this example 之后,我编写了以下脚本:
# file: get_rand.py
from random import choice
from time import sleep
import sys
def main():
sys.stderr.write('child: starting loop...\n')
for _ in range(5):
print(choice('abcdefghijklmnopqrstuvwxyz'))
sys.stderr.write('child: going to sleep\n')
sleep(0.5)
if __name__ == '__main__':
main()
和:
# file: async_test.py
import asyncio
import time
class Protocol(asyncio.SubprocessProtocol):
def __init__(self, exit_f):
self.exit = exit_f
print('Protocol initialised')
def pipe_data_received(self, fd, data):
print('Data received')
if fd == 1:
with open('rand_file.txt', 'a') as out:
out.write(bytes(data).decode('ascii'))
elif fd == 2:
print('Received error data!')
print(data)
def pipe_connection_lost(self, fd, exc):
print('Pipe connection lost')
if exc is not None:
print(exc)
raise exc
def process_exited(self):
self.exit.set_result(True)
print('Subprocess exited')
@asyncio.coroutine
def mycoro():
loop = asyncio.get_event_loop()
exit_future = asyncio.Future(loop=loop)
print('creating process...')
subprocess = loop.subprocess_exec(lambda: Protocol(exit_future),
'python3.5', 'get_rand.py',
stdin=None, stderr=None)
transp, proto = yield from subprocess
print('waiting for subprocess to finish...')
yield from exit_future
transp.close()
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(mycoro())
loop.close()
执行此代码时,我得到以下信息:
$ python3.5 async_test.py
creating process...
Protocol initialised
waiting for subprocess to finish...
child: starting loop...
child: going to sleep
child: going to sleep
child: going to sleep
child: going to sleep
child: going to sleep
Data received
Pipe connection lost
Subprocess exited
关于这一切我有很多问题:
- Apparent只有 child 发送的数据仅在 child 终止后触发 pipe_data_received 事件一次。有没有办法生成 child 进程并在每次写入 stdout 时触发 pipe_data_received 事件?
- 如果我删除行
transp, proto = yield from subprocess
整个事情就挂在creating process...
上,所以看起来 child 直到 parent 才开始transp, proto = yield from subprocess
。那是对的吗?这是为什么? - 如果我希望我的进程产生一个 child,它永远运行并定期触发 pipe_data_received,同时启动进程保持其执行流程,并做其他事情怎么办?这是满足这种需求的正确工具吗?
1.
print
将数据写入标准输出缓冲区,默认情况下它们只被刷新一次。您可以添加显式 flush
.
for _ in range(5):
print(choice('abcdefghijklmnopqrstuvwxyz'))
sys.stdout.flush()
或 ptyhon3.3 及更高版本
for _ in range(5):
print(choice('abcdefghijklmnopqrstuvwxyz'), flush=True)
更多信息How to flush output of Python print?。
2.
subprocess_exec
returns 协程。您想要 运行 的每个协程都必须安排在循环中。 yield from
只是安排它并等待它完成(因为 subprocess_exec
完成意味着流程已执行)。
3.
对于后台的运行任务,您还必须循环安排它,但不要等待结果。您可以使用 ensure_future`.
@asyncio.coroutine
def mycoro():
loop = asyncio.get_event_loop()
exit_future = asyncio.Future(loop=loop)
print('creating process...')
subprocess = loop.subprocess_exec(lambda: Protocol(exit_future),
'python3.5', 'get_rand.py',
stdin=None, stderr=None)
task = asyncio.ensure_future(subprocess)
print('Subprocess is handled in the background task')
# this function is called with run_until_complete,
# since that returning means complete we would not
# finish subprocess task
# so im leaving it
yield from exit_future
编辑
这里是 运行ning 循环的简单示例。我删除了所有 exit_future
相关内容,因为不需要。
import asyncio
import time
class Protocol(asyncio.Protocol):
def __init__(self):
print('Protocol initialised')
def pipe_data_received(self, fd, data):
print('Data received %s' % data)
if fd == 1:
with open('rand_file.txt', 'a') as out:
out.write(bytes(data).decode('ascii'))
elif fd == 2:
print('Received error data!')
print(data)
def pipe_connection_lost(self, fd, exc):
print('Pipe connection lost')
if exc is not None:
print(exc)
raise exc
def process_exited(self):
print('Subprocess exited')
@asyncio.coroutine
def mycoro():
loop = asyncio.get_event_loop()
print('creating process...')
subprocess = loop.subprocess_exec(lambda: Protocol(),
'python3.5', 'get_rand.py',
stdin=None, stderr=None)
asyncio.ensure_future(subprocess)
asyncio.ensure_future(dummy_work())
print('Mycoro finished, tasks are scheduled')
@asyncio.coroutine
def dummy_work():
while True:
yield from asyncio.sleep(1)
print('dummy work')
def main():
loop = asyncio.get_event_loop()
asyncio.ensure_future(mycoro())
loop.run_forever()
loop.close()
main()
def main():
loop = asyncio.get_event_loop()
asyncio.ensure_future(mycoro())
loop.run_forever()
loop.close()
main()