如何使用 Django Channels 并行处理 WebSocket 消息?

How to process WebSocket messages in parallel using Django Channels?

我们开始使用 Django Channels 并正在努力处理以下用例:

我们的应用程序在短时间内收到来自单个客户端(另一个服务器)的多个请求。创建每个 响应需要很长时间 订单 将响应发送到客户端无关紧要

我们希望保持 打开 WebSocket 连接 以减少从同一客户端发送许多请求和响应的连接开销。

Django Channels 似乎严格按顺序处理同一 WebSocket 连接上的消息,并且 不会在前一帧完成之前开始处理下一帧回复.

考虑以下示例:

例子

服务器端

import asyncio
from channels.generic.websocket import AsyncWebsocketConsumer

class QuestionConsumer(AsyncWebsocketConsumer):
    async def websocket_connect(self, event):
        await self.accept()

    async def complicated_answer(self, question):
        await asyncio.sleep(3)
        return {
             "What is the Answer to Life, The Universe and Everything?": "42",
             "Why?": "Because.",
        }.get(question, "Don't know")


    async def receive(self, text_data=None, bytes_data=None):
        # while awaiting below, we should start processing the next WS frame
        answer = await self.complicated_answer(text_data)
        await self.send(answer)

asgi.py:

from django.urls import re_path
from channels.routing import ProtocolTypeRouter, URLRouter
application = ProtocolTypeRouter(
    {"websocket": URLRouter([
        re_path(r"^questions", QuestionConsumer.as_asgi(), name="questions",)
    ]}
  )
)

客户端

import asyncio
import websockets
from time import time

async def main():
    async with websockets.connect("ws://0.0.0.0:8000/questions") as ws:
        tasks = []
        for m in [
                "What is the Answer to Life, The Universe and Everything?",
                "Why?"
        ]:
            tasks.append(ws.send(m))
        # send all requests (without waiting for response)
        time_before = time()
        await asyncio.gather(*tasks)
        # wait for responses
        for t in tasks:
            print(await ws.recv())
            print("{:.1f} seconds since first request".format(time() - time_before))
asyncio.get_event_loop().run_until_complete(main())

结果

实际

42
3.0 seconds since first request
Because.
6.0 seconds since first request

需要

42
3.0 seconds since first request
Because.
3.0 seconds since first request

换句话说,我们希望事件循环 在异步任务之间切换 不仅针对多个消费者,而且 对同一处理的所有任务消费者这可能吗,或者是否有我们忽略的解决方法?您是否使用 Django Channels 应对过类似的挑战?您是如何解决这些挑战的?

对于每个传入的 WebSocket 消息,都会依次调用消费者的 receive 函数,并且当到达第一个接收的 await 时,不会为第二个消息调用接收方法,因此将上下文切换到第二个协程尚不可能。我找不到这方面的来源,但我猜这是 ASGI 协议本身的一部分。对于许多用例,可能需要严格按照接收顺序处理 WebSocket 消息。

异步处理消息的解决方案是不发送来自receive方法的响应,而是发送来自通过loop.create_task调度的协程的响应。

安排生成响应的长 运行 协程允许 receive 完成,并允许下一个 receive 开始。一旦第二条消息的响应生成被调度,两个协程将被调度,解释器可以切换上下文以异步执行它们。

对于问题中的例子,这是我找到的解决方案:

class QuestionConsumer(AsyncWebsocketConsumer):

    async def complicated_answer(self, question):
        await asyncio.sleep(3)
        answer = {
             "What is the Answer to Life, The Universe and Everything?": "42",
             "Why?": "Because.",
        }.get(question, "Don't know")
        # instead of returning the answer, send it directly to client as a response
        await self.send(answer)


    async def receive(self, text_data=None, bytes_data=None):
        # instead of awaiting, schedule the coroutine
        loop = asyncio.get_running_loop()
        loop.create_task(
            self.complicated_answer(text_data)
        )

这个改变后的消费者的输出与问题给出的期望输出相匹配。请注意,响应可能会乱序返回,客户端负责将请求与响应相匹配。

请注意,对于 Python 版本 <3.7,应使用 get_event_loop 而不是 get_running_loop