三重奏:多个任务从同一个 fd 读取
Trio: multiple tasks reading from the same fd
我有一个文件描述符,我想通过多个任务从中读取。 fd 上的每个 read() 请求都会 return 一个完整的独立数据包(只要数据可用)。
我天真的实现是让每个工人 运行 执行以下循环:
async def work_loop(fd):
while True:
await trio.hazmat.wait_readable(fd)
buf = os.read(fd, BUFSIZE)
if not buf:
break
await do_work(buf)
不幸的是,这不起作用,因为如果多个任务阻塞在同一个 fd 上,trio 会引发 ResourceBusyError
。所以我的下一次迭代是编写自定义等待函数:
async def work_loop(fd):
while True:
await my_wait_readable(fd)
buf = os.read(fd, BUFSIZE)
if not buf:
break
await do_work(buf)
哪里
read_queue = trio.hazmat.ParkingLot()
async def my_wait_readable():
if name is None:
name = trio.hazmat.current_task().name
while True:
try:
log.debug('%s: Waiting for fd to become readable...', name)
await trio.hazmat.wait_readable(fd)
except trio.ResourceBusyError:
log.debug('%s: Resource busy, parking in read queue.', name)
await read_queue.park()
continue
log.debug('%s: fd readable, unparking next task.', name)
read_queue.unpark()
break
但是,在测试中,我收到如下 og 消息:
2018-09-18 13:09:17.219 pyfuse3-worker-37: Waiting for fd to become readable...
2018-09-18 13:09:17.219 pyfuse3-worker-47: Waiting for fd to become readable...
2018-09-18 13:09:17.220 pyfuse3-worker-53: Waiting for fd to become readable...
2018-09-18 13:09:17.220 pyfuse3-worker-51: fd readable, unparking next task.
2018-09-18 13:09:17.220 pyfuse3-worker-51: doing work
2018-09-18 13:09:17.221 pyfuse3-worker-47: Resource busy, parking in read queue.
2018-09-18 13:09:17.221 pyfuse3-worker-37: Resource busy, parking in read queue.
2018-09-18 13:09:17.221 pyfuse3-worker-53: Resource busy, parking in read queue.
换句话说:
- 所有任务进入
trio.hazmat.wait_readable
- 一个任务 return 成功并尝试取消下一个任务(但还有 none)
- 其他任务收到 BusyError 并自行停放
- 没有任何事情发生,因为所有工作人员都已停好
解决这个问题的正确方法是什么?
来自同一个 fd 的多个读者没有意义,使用 Trio(或不使用)不会改变这个基本事实。为什么你首先要尝试这样做?
如果出于某种原因您确实确实需要并行多个任务来 post- 处理您的数据,请使用一个读取任务将数据添加到队列中,然后让您的处理任务从中获取数据。
或者,您可以使用锁:
read_lock = trio.Lock()
async def work_loop(fd):
while True:
async with read_lock:
await trio.hazmat.wait_readable(fd)
buf = os.read(fd, BUFSIZE)
if not buf:
break
await do_work(buf)
我有一个文件描述符,我想通过多个任务从中读取。 fd 上的每个 read() 请求都会 return 一个完整的独立数据包(只要数据可用)。
我天真的实现是让每个工人 运行 执行以下循环:
async def work_loop(fd):
while True:
await trio.hazmat.wait_readable(fd)
buf = os.read(fd, BUFSIZE)
if not buf:
break
await do_work(buf)
不幸的是,这不起作用,因为如果多个任务阻塞在同一个 fd 上,trio 会引发 ResourceBusyError
。所以我的下一次迭代是编写自定义等待函数:
async def work_loop(fd):
while True:
await my_wait_readable(fd)
buf = os.read(fd, BUFSIZE)
if not buf:
break
await do_work(buf)
哪里
read_queue = trio.hazmat.ParkingLot()
async def my_wait_readable():
if name is None:
name = trio.hazmat.current_task().name
while True:
try:
log.debug('%s: Waiting for fd to become readable...', name)
await trio.hazmat.wait_readable(fd)
except trio.ResourceBusyError:
log.debug('%s: Resource busy, parking in read queue.', name)
await read_queue.park()
continue
log.debug('%s: fd readable, unparking next task.', name)
read_queue.unpark()
break
但是,在测试中,我收到如下 og 消息:
2018-09-18 13:09:17.219 pyfuse3-worker-37: Waiting for fd to become readable...
2018-09-18 13:09:17.219 pyfuse3-worker-47: Waiting for fd to become readable...
2018-09-18 13:09:17.220 pyfuse3-worker-53: Waiting for fd to become readable...
2018-09-18 13:09:17.220 pyfuse3-worker-51: fd readable, unparking next task.
2018-09-18 13:09:17.220 pyfuse3-worker-51: doing work
2018-09-18 13:09:17.221 pyfuse3-worker-47: Resource busy, parking in read queue.
2018-09-18 13:09:17.221 pyfuse3-worker-37: Resource busy, parking in read queue.
2018-09-18 13:09:17.221 pyfuse3-worker-53: Resource busy, parking in read queue.
换句话说:
- 所有任务进入
trio.hazmat.wait_readable
- 一个任务 return 成功并尝试取消下一个任务(但还有 none)
- 其他任务收到 BusyError 并自行停放
- 没有任何事情发生,因为所有工作人员都已停好
解决这个问题的正确方法是什么?
来自同一个 fd 的多个读者没有意义,使用 Trio(或不使用)不会改变这个基本事实。为什么你首先要尝试这样做?
如果出于某种原因您确实确实需要并行多个任务来 post- 处理您的数据,请使用一个读取任务将数据添加到队列中,然后让您的处理任务从中获取数据。
或者,您可以使用锁:
read_lock = trio.Lock()
async def work_loop(fd):
while True:
async with read_lock:
await trio.hazmat.wait_readable(fd)
buf = os.read(fd, BUFSIZE)
if not buf:
break
await do_work(buf)