从 Django Channels 内部联系另一个 WebSocket 服务器

Contacting another WebSocket server from inside Django Channels

我有两个 websocket 服务器,称它们为 Main 和 Worker,这是所需的工作流程:

这可行吗?我在 Channels 中找不到任何 WS 客户端功能。我天真地尝试这样做(在 consumers.py):

import websockets

class SampleConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data):
        async with websockets.connect(url) as worker_ws:
            await worker_ws.send(json.dumps({ 'to': 'Worker' }))
            result = json.loads(await worker_ws.recv())
        await self.send(text_data=json.dumps({ 'to': 'Client' })

但是,with 部分似乎阻塞了(在从 Worker 收到响应之前,Main 似乎不接受任何进一步的消息)。我怀疑这是因为 websockets 运行 有它自己的循环,但我不确定。 (编辑:我比较了 id(asyncio.get_running_loop()) ,它似乎是同一个循环。我不知道为什么它会阻塞。)

response{ "to": "Client" }不需要在这里,我换个method也行,只要收到Worker的response就触发

有没有办法做到这一点,还是我找错树了?

如果没有办法做到这一点,我在想有一个线程(或进程?或一个单独的应用程序?)与Worker通信,并使用channel_layer与Main通信。这可行吗?如果我能得到确认(对于代码示例更是如此),我将不胜感激。

EDIT 我想我知道发生了什么(虽然仍在调查),但是——我相信来自客户端的一个连接实例化一个消费者,虽然不同的实例都可以 运行 同时,在一个消费者实例中,该实例似乎不允许在一个方法完成之前启动第二个方法。这个对吗?现在看看将请求和等待响应代码移动到线程中是否可行。

是的,Django Channels 不提供 websocket 客户端,因为它主要用作服务器。 从你的代码来看,你似乎并不真的需要 Main 和 Worker 之间的 websocket 通信,因为你只是启动一个套接字,发送一条消息,接收响应并关闭套接字。这是常规 HTTP 的经典用例,因此如果您真的不需要保持连接有效,我建议您改用常规 HTTP 端点并使用 aioHTTP 作为客户端。

但是,如果您确实需要客户端,那么您应该在客户端连接时打开一次套接字,并在客户端断开连接时关闭它。你可以这样做。

import websockets

async def create_ws(on_create, on_message):
    uri = "wss://localhost:8765"
    async with websockets.connect(uri) as websocket:
        await on_create(websocket)
        while True:
            message = await websocket.recv()
            if message:
                await on_message(message)



class WebsocketClient:
    asyn def __init__(self, channel):
        self.channel = channel
        self.ws = None
        await creat_ws(self.on_message)
    
    async def on_create(self, was):
        self.ws = ws

    async def on_message(self, ws, message):
        await self.channel.send(text_data=json.dumps(message)
    
    async def send(self, message):
        self.ws.send(message)

    asunc def close(self):
        self.ws.close()

然后在你的consumer中,你可以使用客户端如下:

class SampleConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.ws_client = WebsocketClient(self)
    
    async def receive(self, text_data):
        await self.ws_client.send(text_data)
    
    async def disconnect(self, code):
        await self.ws_client.close()

看来我使用我发布的最新想法设法做到了 — 启动一个线程来处理与 Worker 的连接。像这样:

class SampleConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data):
        threading.Thread(
            target=asyncio.run,
            args=(self.talk_to_worker(
                url,
                { 'to': 'Worker' },
            ),)
        ).start()

    async def talk_to_worker(self, url, message):
        async with websockets.connect(url) as worker_ws:
            await worker_ws.send(json.dumps(message))
            result = json.loads(await worker_ws.recv())
        await self.send(text_data=json.dumps({ 'to': 'Client' })

在每个方向上使用 HTTP 请求实际上可能更聪明(因为两个端点都可以是 HTTP 服务器),但这似乎可行。

每当我从另一个 WebSocket 服务器收到消息时,我都想在我的 Django 应用程序中处理消息。

我想到了使用 Django 论坛上的 WebSockets client library and keeping it running as a separate process using the manage.py command from this post。

您可以定义一个异步协程client(websocket_url)来监听从 WebSocket 服务器接收到的消息。

import asyncio
import websockets


async def client(websocket_url):
    async for websocket in websockets.connect(uri):
        print("Connected to Websocket server")
        try:
            async for message in websocket:
            # Process message received on the connection.
                print(message)
        except websockets.ConnectionClosed:
            print("Connection lost! Retrying..")
            continue #continue will retry websocket connection by exponential back off 

在上面的代码中,connect() 充当无限异步迭代器。更多关于 here.

你可以运行自定义管理命令class.

handle()方法里面的上述协程

runwsclient.py

from django.core.management.base import BaseCommand

class Command(BaseCommand):

    def handle(self, *args, **options):
        URL = "ws://example.com/messages"
        print(f"Connecting to websocket server {URL}")
        asyncio.run(client(URL))

最后,运行 manage.py 命令。

python manage.py runwsclient

您还可以将处理程序传递给 client(ws_url, msg_handler),它将处理消息,以便处理逻辑保留在客户端之外。

2022 年 5 月 31 日更新

我创建了一个 django 包以将上述功能与最小设置集成在一起:django-websocketclient