将字节从 asyncio StreamReader 泵入文件描述符
Pump bytes from asyncio StreamReader into a file descriptor
我有一个 Python 函数(用 C++ 实现),它从文件描述符(在 C++ 端包装在 FILE*
中)读取,我需要从 asyncio.StreamReader
.具体来说,reader 是 HTTP 响应的内容:aiohttp.ClientResponse.content.
我想我可能 open a pipe, pass the read-end to the C++ function and connect the write-end 到 asyncio
的事件循环。但是,如何通过适当的流量控制和尽可能少的复制将数据从流 reader 移动到管道?
缺少部分的代码框架如下:
# obtain the StreamReader from aiohttp
content = aiohttp_client_response.content
# create a pipe
(pipe_read_fd, pipe_write_fd) = os.pipe()
# now I need a suitable protocol to manage the pipe transport
protocol = ?
(pipe_transport, __) = loop.connect_write_pipe(lambda: protocol, pipe_write_fd)
# the protocol should start reading from `content` and writing into the pipe
return pipe_read_fd
我通过使用 ThreadPoolExecutor
并阻止调用 os.write
:
解决了这个问题
(read_fd, write_fd) = os.pipe()
task_1 = loop.create_task(pump_bytes_into_fd(write_fd))
task_2 = loop.run_in_executor(executor_1, parse_bytes_from_fd(read_fd))
async def pump_bytes_into_fd(write_fd):
while True:
chunk = await stream.read(CHUNK_SIZE)
if chunk is None: break
# process the chunk
await loop.run_in_executor(executor_2, os.write, write_fd, chunk)
使用两个不同的执行器来阻塞读写以避免死锁是至关重要的。
来自 subprocess_attach_write_pipe asyncio 示例:
rfd, wfd = os.pipe()
pipe = open(wfd, 'wb', 0)
transport, _ = await loop.connect_write_pipe(asyncio.Protocol, pipe)
transport.write(b'data')
编辑 - 对于写入流量控制,请参阅以下方法:
这是一个可能的 FlowControl
实现,灵感来自 StreamWriter.drain:
class FlowControl(asyncio.streams.FlowControlMixin):
async def drain(self):
await self._drain_helper()
用法:
transport, protocol = await loop.connect_write_pipe(FlowControl, pipe)
transport.write(b'data')
await protocol.drain()
我有一个 Python 函数(用 C++ 实现),它从文件描述符(在 C++ 端包装在 FILE*
中)读取,我需要从 asyncio.StreamReader
.具体来说,reader 是 HTTP 响应的内容:aiohttp.ClientResponse.content.
我想我可能 open a pipe, pass the read-end to the C++ function and connect the write-end 到 asyncio
的事件循环。但是,如何通过适当的流量控制和尽可能少的复制将数据从流 reader 移动到管道?
缺少部分的代码框架如下:
# obtain the StreamReader from aiohttp
content = aiohttp_client_response.content
# create a pipe
(pipe_read_fd, pipe_write_fd) = os.pipe()
# now I need a suitable protocol to manage the pipe transport
protocol = ?
(pipe_transport, __) = loop.connect_write_pipe(lambda: protocol, pipe_write_fd)
# the protocol should start reading from `content` and writing into the pipe
return pipe_read_fd
我通过使用 ThreadPoolExecutor
并阻止调用 os.write
:
(read_fd, write_fd) = os.pipe()
task_1 = loop.create_task(pump_bytes_into_fd(write_fd))
task_2 = loop.run_in_executor(executor_1, parse_bytes_from_fd(read_fd))
async def pump_bytes_into_fd(write_fd):
while True:
chunk = await stream.read(CHUNK_SIZE)
if chunk is None: break
# process the chunk
await loop.run_in_executor(executor_2, os.write, write_fd, chunk)
使用两个不同的执行器来阻塞读写以避免死锁是至关重要的。
来自 subprocess_attach_write_pipe asyncio 示例:
rfd, wfd = os.pipe()
pipe = open(wfd, 'wb', 0)
transport, _ = await loop.connect_write_pipe(asyncio.Protocol, pipe)
transport.write(b'data')
编辑 - 对于写入流量控制,请参阅以下方法:
这是一个可能的 FlowControl
实现,灵感来自 StreamWriter.drain:
class FlowControl(asyncio.streams.FlowControlMixin):
async def drain(self):
await self._drain_helper()
用法:
transport, protocol = await loop.connect_write_pipe(FlowControl, pipe)
transport.write(b'data')
await protocol.drain()