使用 asyncio.Queue 安全地将数据从子进程传递到异步任务
Safely pass data from a subprocess into async task with asyncio.Queue
我在 Amazon Linux EC2 实例中有一个目录结构。我想要一个 Python 脚本异步监视此目录(和所有子目录)的文件创建。
我决定 运行 在子进程中 inotifywait 并将输出传递到异步任务中进行处理。我 运行 子进程并在其自己的线程中监视输出,并使用 put_nowait()
将标准输出传递到 asyncio.Queue()
中,该进程由主进程上的异步任务 运行ning 监视线程。
import asyncio
import subprocess
import threading
def watch_dir(dir_to_watch: str, output_queue: asyncio.Queue):
inotify_cmd = f'sudo inotifywait -e create -m -r {dir_to_watch}'
proc = subprocess.Popen(inotify_cmd,
stdout=subprocess.PIPE,
shell=True)
while True:
line = proc.stdout.readline().rstrip()
if not line:
break
output_queue.put_nowait(line)
async def process_lines(input_queue: asyncio.Queue):
while True:
line = await input_queue.get()
# do stuff with line
if __name__ == '__main__':
q = asyncio.Queue()
dir_watch_thread = threading.Thread(target=watch_dir, args=(_dir_to_watch, q))
dir_watch_thread.start()
asyncio.run(process_lines(q))
是否有更好、更 performant/resource 有效的方法来做到这一点?这甚至是 asyncio.Queue()
的安全用法吗?我读过有关 janus 的内容,它自称是在同步和异步上下文之间通过队列传递数据的安全方式。我是否需要使用这样的数据结构(以及为什么)?如果不需要,我不想包含额外的依赖项。
Is this even a safe usage of asyncio.Queue
?
不,因为 asyncio.Queue
是 not thread-safe。您甚至可能会观察到这一点,其症状是您从队列中读取的协程不会立即注意到有项目进入,而只会在事件循环中发生不相关的 IO 或超时事件时唤醒。
解决此问题的一种方法是使用 call_soon_threadsafe
:
# this requires you to pass "loop" as well
loop.call_soon_threadsafe(output_queue.put_nowait, line)
更好的方法是使用 asyncio 自己的子进程处理,它可以让你完全避免线程。例如(未经测试):
async def watch_dir(dir_to_watch, output_queue):
proc = await asyncio.create_subprocess_exec(
'sudo', 'inotifywait', '-e', 'create', '-m',
'-r', dir_to_watch, stdout=subprocess.PIPE)
while True:
line = await proc.stdout.readline()
if not line:
break
await output_queue.put(line.rstrip())
async def process_lines(dir_to_watch):
queue = asyncio.Queue()
# run watch_dir() in the "background"
asyncio.create_task(watch_dir(dir_to_watch), queue)
while True:
line = await queue.get()
print(line) # ...
if __name__ == '__main__':
asyncio.run(process_lines(_watch_dir))
在上面的代码中,我用显式参数替换了 shell=True
的使用,以避免 shell 注入的可能性,尤其是与 sudo
.
相关的情况
Is there a better, more performant/resource efficient way to do this?
在简单的单生产者单消费者设置中,您可以取消队列并只使用生成器:
async def watch_dir(dir_to_watch):
proc = await asyncio.create_subprocess_exec(
'sudo', 'inotifywait', '-e', 'create', '-m',
'-r', dir_to_watch, stdout=subprocess.PIPE)
while True:
line = await proc.stdout.readline()
if not line:
break
yield line.rstrip()
async def process_lines(dir_to_watch):
async for line in watch_dir(dir_to_watch):
print(line) # ...
我在 Amazon Linux EC2 实例中有一个目录结构。我想要一个 Python 脚本异步监视此目录(和所有子目录)的文件创建。
我决定 运行 在子进程中 inotifywait 并将输出传递到异步任务中进行处理。我 运行 子进程并在其自己的线程中监视输出,并使用 put_nowait()
将标准输出传递到 asyncio.Queue()
中,该进程由主进程上的异步任务 运行ning 监视线程。
import asyncio
import subprocess
import threading
def watch_dir(dir_to_watch: str, output_queue: asyncio.Queue):
inotify_cmd = f'sudo inotifywait -e create -m -r {dir_to_watch}'
proc = subprocess.Popen(inotify_cmd,
stdout=subprocess.PIPE,
shell=True)
while True:
line = proc.stdout.readline().rstrip()
if not line:
break
output_queue.put_nowait(line)
async def process_lines(input_queue: asyncio.Queue):
while True:
line = await input_queue.get()
# do stuff with line
if __name__ == '__main__':
q = asyncio.Queue()
dir_watch_thread = threading.Thread(target=watch_dir, args=(_dir_to_watch, q))
dir_watch_thread.start()
asyncio.run(process_lines(q))
是否有更好、更 performant/resource 有效的方法来做到这一点?这甚至是 asyncio.Queue()
的安全用法吗?我读过有关 janus 的内容,它自称是在同步和异步上下文之间通过队列传递数据的安全方式。我是否需要使用这样的数据结构(以及为什么)?如果不需要,我不想包含额外的依赖项。
Is this even a safe usage of
asyncio.Queue
?
不,因为 asyncio.Queue
是 not thread-safe。您甚至可能会观察到这一点,其症状是您从队列中读取的协程不会立即注意到有项目进入,而只会在事件循环中发生不相关的 IO 或超时事件时唤醒。
解决此问题的一种方法是使用 call_soon_threadsafe
:
# this requires you to pass "loop" as well
loop.call_soon_threadsafe(output_queue.put_nowait, line)
更好的方法是使用 asyncio 自己的子进程处理,它可以让你完全避免线程。例如(未经测试):
async def watch_dir(dir_to_watch, output_queue):
proc = await asyncio.create_subprocess_exec(
'sudo', 'inotifywait', '-e', 'create', '-m',
'-r', dir_to_watch, stdout=subprocess.PIPE)
while True:
line = await proc.stdout.readline()
if not line:
break
await output_queue.put(line.rstrip())
async def process_lines(dir_to_watch):
queue = asyncio.Queue()
# run watch_dir() in the "background"
asyncio.create_task(watch_dir(dir_to_watch), queue)
while True:
line = await queue.get()
print(line) # ...
if __name__ == '__main__':
asyncio.run(process_lines(_watch_dir))
在上面的代码中,我用显式参数替换了 shell=True
的使用,以避免 shell 注入的可能性,尤其是与 sudo
.
Is there a better, more performant/resource efficient way to do this?
在简单的单生产者单消费者设置中,您可以取消队列并只使用生成器:
async def watch_dir(dir_to_watch):
proc = await asyncio.create_subprocess_exec(
'sudo', 'inotifywait', '-e', 'create', '-m',
'-r', dir_to_watch, stdout=subprocess.PIPE)
while True:
line = await proc.stdout.readline()
if not line:
break
yield line.rstrip()
async def process_lines(dir_to_watch):
async for line in watch_dir(dir_to_watch):
print(line) # ...