如何向多个消费者广播 asyncio StreamReader?
How can I broadcast asyncio StreamReader to several consumers?
我正在尝试使用 aiohttp 制作一种高级反向代理。
我想获取 HTTP 请求的内容并将其传递给新的 HTTP 请求而不将其拉入内存。虽然只有上游,但任务相当简单:aiohttp 服务器 returns 请求内容作为 StreamReader
并且 aiohttp 客户端可以接受 StreamReader
作为请求主体。
问题是我想向多个上游发送原始请求,或者,例如,同时向上游发送内容并将其写入磁盘。
有没有什么工具可以播放StreamReader
的内容?
我尝试制作一些简单的广播器,但它在大对象上失败了。我做错了什么?
class StreamBroadcast:
async def __do_broadcast(self):
while True:
chunk = await self.__source.read(self.__n)
if not chunk:
break
for output in self.__sinks:
output.feed_data(chunk)
for output in self.__sinks:
output.feed_eof()
def __init__(self, source: StreamReader, sinks_count: int, n: int = -1):
self.__source = source
self.__n = n
self.__sinks = [StreamReader() for i in range(sinks_count)]
self.__task = asyncio.create_task(self.__do_broadcast())
@property
def sinks(self) -> Iterable[StreamReader]:
return self.__sinks
@property
def ready(self) -> Task:
return self.__task
好吧,我查看了 asyncio
来源并发现我应该使用 Transport
通过流泵送数据。这是我的解决方案。
import asyncio
from asyncio import StreamReader, StreamWriter, ReadTransport, StreamReaderProtocol
from typing import Iterable
class _BroadcastReadTransport(ReadTransport):
"""
Internal class, is not meant to be instantiated manually
"""
def __init__(self, source: StreamReader, sinks: Iterable[StreamReader]):
super().__init__()
self.__source = source
self.__sinks = tuple(StreamReaderProtocol(s) for s in sinks)
for sink in sinks:
sink.set_transport(self)
self.__waiting_for_data = len(self.__sinks)
asyncio.create_task(self.__broadcast_next_chunk(), name='initial-chunk-broadcast')
def is_reading(self):
return self.__waiting_for_data == len(self.__sinks)
def pause_reading(self):
self.__waiting_for_data -= 1
async def __broadcast_next_chunk(self):
data = await self.__source.read()
if data:
for sink in self.__sinks:
sink.data_received(data)
if self.is_reading():
asyncio.create_task(self.__broadcast_next_chunk())
else:
for sink in self.__sinks:
sink.eof_received()
def resume_reading(self):
self.__waiting_for_data += 1
if self.__waiting_for_data == len(self.__sinks):
asyncio.create_task(self.__broadcast_next_chunk(), name='chunk-broadcast')
@property
def is_completed(self):
return self.__source.at_eof()
class StreamBroadcast:
def __init__(self, source: StreamReader, sinks_count: int):
self.__source = source
self.__sinks = tuple(StreamReader() for _ in range(sinks_count))
self.__transport = _BroadcastReadTransport(self.__source, self.__sinks)
@property
def sinks(self) -> Iterable[StreamReader]:
return self.__sinks
@property
def is_completed(self):
return self.__transport.is_completed
希望一旦我将它打包到 pip 模块。
我正在尝试使用 aiohttp 制作一种高级反向代理。
我想获取 HTTP 请求的内容并将其传递给新的 HTTP 请求而不将其拉入内存。虽然只有上游,但任务相当简单:aiohttp 服务器 returns 请求内容作为 StreamReader
并且 aiohttp 客户端可以接受 StreamReader
作为请求主体。
问题是我想向多个上游发送原始请求,或者,例如,同时向上游发送内容并将其写入磁盘。
有没有什么工具可以播放StreamReader
的内容?
我尝试制作一些简单的广播器,但它在大对象上失败了。我做错了什么?
class StreamBroadcast:
async def __do_broadcast(self):
while True:
chunk = await self.__source.read(self.__n)
if not chunk:
break
for output in self.__sinks:
output.feed_data(chunk)
for output in self.__sinks:
output.feed_eof()
def __init__(self, source: StreamReader, sinks_count: int, n: int = -1):
self.__source = source
self.__n = n
self.__sinks = [StreamReader() for i in range(sinks_count)]
self.__task = asyncio.create_task(self.__do_broadcast())
@property
def sinks(self) -> Iterable[StreamReader]:
return self.__sinks
@property
def ready(self) -> Task:
return self.__task
好吧,我查看了 asyncio
来源并发现我应该使用 Transport
通过流泵送数据。这是我的解决方案。
import asyncio
from asyncio import StreamReader, StreamWriter, ReadTransport, StreamReaderProtocol
from typing import Iterable
class _BroadcastReadTransport(ReadTransport):
"""
Internal class, is not meant to be instantiated manually
"""
def __init__(self, source: StreamReader, sinks: Iterable[StreamReader]):
super().__init__()
self.__source = source
self.__sinks = tuple(StreamReaderProtocol(s) for s in sinks)
for sink in sinks:
sink.set_transport(self)
self.__waiting_for_data = len(self.__sinks)
asyncio.create_task(self.__broadcast_next_chunk(), name='initial-chunk-broadcast')
def is_reading(self):
return self.__waiting_for_data == len(self.__sinks)
def pause_reading(self):
self.__waiting_for_data -= 1
async def __broadcast_next_chunk(self):
data = await self.__source.read()
if data:
for sink in self.__sinks:
sink.data_received(data)
if self.is_reading():
asyncio.create_task(self.__broadcast_next_chunk())
else:
for sink in self.__sinks:
sink.eof_received()
def resume_reading(self):
self.__waiting_for_data += 1
if self.__waiting_for_data == len(self.__sinks):
asyncio.create_task(self.__broadcast_next_chunk(), name='chunk-broadcast')
@property
def is_completed(self):
return self.__source.at_eof()
class StreamBroadcast:
def __init__(self, source: StreamReader, sinks_count: int):
self.__source = source
self.__sinks = tuple(StreamReader() for _ in range(sinks_count))
self.__transport = _BroadcastReadTransport(self.__source, self.__sinks)
@property
def sinks(self) -> Iterable[StreamReader]:
return self.__sinks
@property
def is_completed(self):
return self.__transport.is_completed
希望一旦我将它打包到 pip 模块。