Python AIOHTTP 返回响应后立即发送请求

Python AIOHTTP send a request right after returning a response

请考虑以下事项。有一个系统使用 HTTP POST 方法请求数据。发送此类请求后,系统立即等待带有状态代码和数据的 HTTP 响应作为单独的消息。现有系统的构建方式不会接受状态代码和数据组合的响应,老实说,这对我来说没有意义。在我这边,我需要实现一个系统,它将接收此类请求并向客户提供数据。我决定使用 AIOHTTP 库来解决这个问题。由于我是 AIOHTTP 的新手,我找不到在 return 响应后立即将数据发送回客户端的方法。发送请求的现有系统也有一个端点。因此,我想做的是 return 向客户端发送带有状态代码的响应,然后作为客户端向提供的端点发送 POST 请求。所以我的系统既可以作为客户端也可以作为服务器。现在,我不明白的是如何使用 AIOHTTP 实现它。

假设我有以下端点和处理程序。请认为这只是伪代码。

async def init():
    app = web.Application()
    app.add_routes([web.post('/endpoint/', handle)])
    app_runner = web.AppRunner(app)
    await app_runner.setup()
    site = web.TCPSite(runner=app_runner, host='127.0.0.1', port=8008)
    await site.start()

async def handle(request):
    data = await request.text()
    result = await process(data)  # Data processing routine. Might be time-consuming.
    await session.post(SERVER_ENDPOINT, data=result)  # Let's say I have session in this block 
                                                      # and I'm sending data back to the client.
    return web.Response(status=200)  # Returning a status without data.

现在,我需要 web.Response(status=200) 尽快发生,然后才处理接收到的数据并将数据发送回客户端。我想到的是将数据处理和请求发送包装在一个任务中,加入到一个队列中。现在,我总是需要首先发送响应,恐怕在使用任务时,这可能并不总是正确的,是吗?任务是否可以在 return 回复之前完成? AIOHTTP 适合这项任务吗?我应该考虑别的吗?

更新 #1

我找到了一种方法 finish_response。它可以用来实现这样的东西吗?

async def handler(self, request):
    self.finish_response(web.Response(status=200))  # Just an example.
    self.session.post(SERVER_ENDPOINT, data=my_data)
    return True  # or something

aiohttp 有一个名为 aiojobs, which is used to handle background tasks. There is an example of how aiojobs integrates with aiohttp in their documentation 的兄弟项目。

因此,修改您的示例以使用 aiojobs:

import aiojobs.aiohttp

async def init():
    app = web.Application()
    app.add_routes([web.post('/endpoint/', handle)])
    # We must setup AIOJobs from AIOHTTP app
    aiojobs.aiohttp.setup(app)
    app_runner = web.AppRunner(app)
    await app_runner.setup()
    site = web.TCPSite(runner=app_runner, host='127.0.0.1', port=8008)
    await site.start()

async def handle(request):
    data = await request.text()
    result = await process(data)  # Data processing routine. Might be time-consuming.
    # Here we create the background task
    await aiojobs.aiohttp.spawn(request, session.post(SERVER_ENDPOINT, data=result))
    # The response should return as soon as the task is created - it does not wait for the task to finish.
    return web.Response(status=200)  # Returning a status without data.

如果您希望 await process(data) 也被安排为任务,那么您可以将这两个调用移动到一个单独的函数中,并将它们一起安排:

async def push_to_server(data):
  result = await process(data)
  await session.post(SERVER_ENDPOINT, data=result))

async def handle(request):
    data = await request.text()
    await aiojobs.aiohttp.spawn(request, push_to_server(data))
    return web.Response(status=200)

如果你想确保在调用 push_to_server 协程之前发送响应,那么你可以使用异步事件:

import asyncio

async def push_to_server(data, start):
  await start.wait()
  result = await process(data)
  await session.post(SERVER_ENDPOINT, data=result))

async def handle(request):
    data = await request.text()
    start = asyncio.Event()
    await aiojobs.aiohttp.spawn(request, push_to_server(data, start))
    response = web.Response(status=200)
    await response.prepare(request)
    await response.write_eof()
    start.set()
    return response

这里,await response.prepare(request)await response.write_eof()只是发送响应的冗长方式,但允许我们在之后调用start.set(),这将触发push_to_server 正在等待该事件的功能 (await start.wait())。