如何流式传输来自 Twisted 服务器的响应?
How stream a response from a Twisted server?
问题
我的问题是我无法编写一个服务器来流式传输我的应用程序发回的响应。
响应不是逐块检索的,而是在迭代器完成迭代时从单个块中检索的。
方法
当我用Request
的write
方法编写响应时,它很好地理解这是我们发送的一个块。
我检查了 Twisted 是否使用了缓冲区大小,但是消息大小检查似乎是在 doWrite
.
中完成的
调试了一段时间,reactor好像最后只读和写了
如果我正确理解反应器如何与 Twisted 一起工作,它会在文件描述符可用时写入和读取。
什么是 Twisted 中的文件描述符?
为什么写完回复后无法使用?
例子
我已经编写了我希望我的服务器看起来像的最小脚本。
它是一个“类似 ASGI”的服务器,运行 是一个应用程序,迭代一个 returns 非常大的字符串的函数:
# async_stream_server.py
import asyncio
from twisted.internet import asyncioreactor
twisted_loop = asyncio.new_event_loop()
asyncioreactor.install(twisted_loop)
import time
from sys import stdout
from twisted.web import http
from twisted.python.log import startLogging
from twisted.internet import reactor, endpoints
CHUNK_SIZE = 2**16
def async_partial(async_fn, *partial_args):
async def wrapped(*args):
return await async_fn(*partial_args, *args)
return wrapped
def iterable_content():
for _ in range(5):
time.sleep(1)
yield b"a" * CHUNK_SIZE
async def application(send):
for part in iterable_content():
await send(
{
"body": part,
"more_body": True,
}
)
await send({"more_body": False})
class Dummy(http.Request):
def process(self):
asyncio.ensure_future(
application(send=async_partial(self.handle_reply)),
loop=asyncio.get_event_loop()
)
async def handle_reply(self, message):
http.Request.write(self, message.get("body", b""))
if not message.get("more_body", False):
http.Request.finish(self)
print('HTTP response chunk')
class DummyFactory(http.HTTPFactory):
def buildProtocol(self, addr):
protocol = http.HTTPFactory.buildProtocol(self, addr)
protocol.requestFactory = Dummy
return protocol
startLogging(stdout)
endpoints.serverFromString(reactor, "tcp:1234").listen(DummyFactory())
asyncio.set_event_loop(reactor._asyncioEventloop)
reactor.run()
要执行这个例子:
- 在终端中,运行:
python async_stream_server.py
- 在另一个终端中,运行:
curl http://localhost:1234/
您需要稍等片刻才能看到完整消息。
详情
$ python --version
Python 3.10.4
$ pip list
Package Version Editable project location
----------------- ------- --------------------------------------------------
asgiref 3.5.0
Twisted 22.4.0
你只需要在上面再洒一些async
。
正如所写,iterable_content
生成器会阻塞反应器,直到它完成生成内容。这就是为什么在完成之前看不到结果的原因。反应堆在完成之前不会重新控制执行。
那只是因为您使用 time.sleep
向其中插入了延迟。 time.sleep
块。这——以及“异步”应用程序中的所有其他内容——实际上是同步的,并且在完成之前保持对执行的控制。
如果您将 iterable_content
替换为真正异步的东西,例如异步生成器:
async def iterable_content():
for _ in range(5):
await asyncio.sleep(1)
yield b"a" * CHUNK_SIZE
然后使用 async for
:
异步迭代它
async def application(send):
async for part in iterable_content():
await send(
{
"body": part,
"more_body": True,
}
)
await send({"more_body": False})
然后反应器有机会在迭代之间 运行 并且服务器开始逐块产生输出。
问题
我的问题是我无法编写一个服务器来流式传输我的应用程序发回的响应。
响应不是逐块检索的,而是在迭代器完成迭代时从单个块中检索的。
方法
当我用Request
的write
方法编写响应时,它很好地理解这是我们发送的一个块。
我检查了 Twisted 是否使用了缓冲区大小,但是消息大小检查似乎是在 doWrite
.
调试了一段时间,reactor好像最后只读和写了
如果我正确理解反应器如何与 Twisted 一起工作,它会在文件描述符可用时写入和读取。
什么是 Twisted 中的文件描述符?
为什么写完回复后无法使用?
例子
我已经编写了我希望我的服务器看起来像的最小脚本。
它是一个“类似 ASGI”的服务器,运行 是一个应用程序,迭代一个 returns 非常大的字符串的函数:
# async_stream_server.py
import asyncio
from twisted.internet import asyncioreactor
twisted_loop = asyncio.new_event_loop()
asyncioreactor.install(twisted_loop)
import time
from sys import stdout
from twisted.web import http
from twisted.python.log import startLogging
from twisted.internet import reactor, endpoints
CHUNK_SIZE = 2**16
def async_partial(async_fn, *partial_args):
async def wrapped(*args):
return await async_fn(*partial_args, *args)
return wrapped
def iterable_content():
for _ in range(5):
time.sleep(1)
yield b"a" * CHUNK_SIZE
async def application(send):
for part in iterable_content():
await send(
{
"body": part,
"more_body": True,
}
)
await send({"more_body": False})
class Dummy(http.Request):
def process(self):
asyncio.ensure_future(
application(send=async_partial(self.handle_reply)),
loop=asyncio.get_event_loop()
)
async def handle_reply(self, message):
http.Request.write(self, message.get("body", b""))
if not message.get("more_body", False):
http.Request.finish(self)
print('HTTP response chunk')
class DummyFactory(http.HTTPFactory):
def buildProtocol(self, addr):
protocol = http.HTTPFactory.buildProtocol(self, addr)
protocol.requestFactory = Dummy
return protocol
startLogging(stdout)
endpoints.serverFromString(reactor, "tcp:1234").listen(DummyFactory())
asyncio.set_event_loop(reactor._asyncioEventloop)
reactor.run()
要执行这个例子:
- 在终端中,运行:
python async_stream_server.py
- 在另一个终端中,运行:
curl http://localhost:1234/
您需要稍等片刻才能看到完整消息。
详情
$ python --version
Python 3.10.4
$ pip list
Package Version Editable project location
----------------- ------- --------------------------------------------------
asgiref 3.5.0
Twisted 22.4.0
你只需要在上面再洒一些async
。
正如所写,iterable_content
生成器会阻塞反应器,直到它完成生成内容。这就是为什么在完成之前看不到结果的原因。反应堆在完成之前不会重新控制执行。
那只是因为您使用 time.sleep
向其中插入了延迟。 time.sleep
块。这——以及“异步”应用程序中的所有其他内容——实际上是同步的,并且在完成之前保持对执行的控制。
如果您将 iterable_content
替换为真正异步的东西,例如异步生成器:
async def iterable_content():
for _ in range(5):
await asyncio.sleep(1)
yield b"a" * CHUNK_SIZE
然后使用 async for
:
async def application(send):
async for part in iterable_content():
await send(
{
"body": part,
"more_body": True,
}
)
await send({"more_body": False})
然后反应器有机会在迭代之间 运行 并且服务器开始逐块产生输出。