从 Django Channels 内部联系另一个 WebSocket 服务器
Contacting another WebSocket server from inside Django Channels
我有两个 websocket 服务器,称它们为 Main 和 Worker,这是所需的工作流程:
- 客户端向 Main 发送消息
- Main 向 Worker 发送消息
- Worker 响应 Main
- 主要响应客户端
这可行吗?我在 Channels 中找不到任何 WS 客户端功能。我天真地尝试这样做(在 consumers.py
):
import websockets
class SampleConsumer(AsyncWebsocketConsumer):
async def receive(self, text_data):
async with websockets.connect(url) as worker_ws:
await worker_ws.send(json.dumps({ 'to': 'Worker' }))
result = json.loads(await worker_ws.recv())
await self.send(text_data=json.dumps({ 'to': 'Client' })
但是,with
部分似乎阻塞了(在从 Worker 收到响应之前,Main 似乎不接受任何进一步的消息)。我怀疑这是因为 websockets
运行 有它自己的循环,但我不确定。 (编辑:我比较了 id(asyncio.get_running_loop())
,它似乎是同一个循环。我不知道为什么它会阻塞。)
response{ "to": "Client" }
不需要在这里,我换个method也行,只要收到Worker的response就触发
有没有办法做到这一点,还是我找错树了?
如果没有办法做到这一点,我在想有一个线程(或进程?或一个单独的应用程序?)与Worker通信,并使用channel_layer
与Main通信。这可行吗?如果我能得到确认(对于代码示例更是如此),我将不胜感激。
EDIT 我想我知道发生了什么(虽然仍在调查),但是——我相信来自客户端的一个连接实例化一个消费者,虽然不同的实例都可以 运行 同时,在一个消费者实例中,该实例似乎不允许在一个方法完成之前启动第二个方法。这个对吗?现在看看将请求和等待响应代码移动到线程中是否可行。
是的,Django Channels 不提供 websocket 客户端,因为它主要用作服务器。
从你的代码来看,你似乎并不真的需要 Main 和 Worker 之间的 websocket 通信,因为你只是启动一个套接字,发送一条消息,接收响应并关闭套接字。这是常规 HTTP 的经典用例,因此如果您真的不需要保持连接有效,我建议您改用常规 HTTP 端点并使用 aioHTTP 作为客户端。
但是,如果您确实需要客户端,那么您应该在客户端连接时打开一次套接字,并在客户端断开连接时关闭它。你可以这样做。
import websockets
async def create_ws(on_create, on_message):
uri = "wss://localhost:8765"
async with websockets.connect(uri) as websocket:
await on_create(websocket)
while True:
message = await websocket.recv()
if message:
await on_message(message)
class WebsocketClient:
asyn def __init__(self, channel):
self.channel = channel
self.ws = None
await creat_ws(self.on_message)
async def on_create(self, was):
self.ws = ws
async def on_message(self, ws, message):
await self.channel.send(text_data=json.dumps(message)
async def send(self, message):
self.ws.send(message)
asunc def close(self):
self.ws.close()
然后在你的consumer中,你可以使用客户端如下:
class SampleConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.ws_client = WebsocketClient(self)
async def receive(self, text_data):
await self.ws_client.send(text_data)
async def disconnect(self, code):
await self.ws_client.close()
看来我使用我发布的最新想法设法做到了 — 启动一个线程来处理与 Worker 的连接。像这样:
class SampleConsumer(AsyncWebsocketConsumer):
async def receive(self, text_data):
threading.Thread(
target=asyncio.run,
args=(self.talk_to_worker(
url,
{ 'to': 'Worker' },
),)
).start()
async def talk_to_worker(self, url, message):
async with websockets.connect(url) as worker_ws:
await worker_ws.send(json.dumps(message))
result = json.loads(await worker_ws.recv())
await self.send(text_data=json.dumps({ 'to': 'Client' })
在每个方向上使用 HTTP 请求实际上可能更聪明(因为两个端点都可以是 HTTP 服务器),但这似乎可行。
每当我从另一个 WebSocket 服务器收到消息时,我都想在我的 Django 应用程序中处理消息。
我想到了使用 Django 论坛上的 WebSockets client library and keeping it running as a separate process using the manage.py
command from this post。
您可以定义一个异步协程client(websocket_url)
来监听从 WebSocket 服务器接收到的消息。
import asyncio
import websockets
async def client(websocket_url):
async for websocket in websockets.connect(uri):
print("Connected to Websocket server")
try:
async for message in websocket:
# Process message received on the connection.
print(message)
except websockets.ConnectionClosed:
print("Connection lost! Retrying..")
continue #continue will retry websocket connection by exponential back off
在上面的代码中,connect()
充当无限异步迭代器。更多关于 here.
你可以运行自定义管理命令class.
的handle()
方法里面的上述协程
runwsclient.py
from django.core.management.base import BaseCommand
class Command(BaseCommand):
def handle(self, *args, **options):
URL = "ws://example.com/messages"
print(f"Connecting to websocket server {URL}")
asyncio.run(client(URL))
最后,运行 manage.py 命令。
python manage.py runwsclient
您还可以将处理程序传递给 client(ws_url, msg_handler)
,它将处理消息,以便处理逻辑保留在客户端之外。
2022 年 5 月 31 日更新:
我创建了一个 django 包以将上述功能与最小设置集成在一起:django-websocketclient
我有两个 websocket 服务器,称它们为 Main 和 Worker,这是所需的工作流程:
- 客户端向 Main 发送消息
- Main 向 Worker 发送消息
- Worker 响应 Main
- 主要响应客户端
这可行吗?我在 Channels 中找不到任何 WS 客户端功能。我天真地尝试这样做(在 consumers.py
):
import websockets
class SampleConsumer(AsyncWebsocketConsumer):
async def receive(self, text_data):
async with websockets.connect(url) as worker_ws:
await worker_ws.send(json.dumps({ 'to': 'Worker' }))
result = json.loads(await worker_ws.recv())
await self.send(text_data=json.dumps({ 'to': 'Client' })
但是,with
部分似乎阻塞了(在从 Worker 收到响应之前,Main 似乎不接受任何进一步的消息)。我怀疑这是因为 websockets
运行 有它自己的循环,但我不确定。 (编辑:我比较了 id(asyncio.get_running_loop())
,它似乎是同一个循环。我不知道为什么它会阻塞。)
response{ "to": "Client" }
不需要在这里,我换个method也行,只要收到Worker的response就触发
有没有办法做到这一点,还是我找错树了?
如果没有办法做到这一点,我在想有一个线程(或进程?或一个单独的应用程序?)与Worker通信,并使用channel_layer
与Main通信。这可行吗?如果我能得到确认(对于代码示例更是如此),我将不胜感激。
EDIT 我想我知道发生了什么(虽然仍在调查),但是——我相信来自客户端的一个连接实例化一个消费者,虽然不同的实例都可以 运行 同时,在一个消费者实例中,该实例似乎不允许在一个方法完成之前启动第二个方法。这个对吗?现在看看将请求和等待响应代码移动到线程中是否可行。
是的,Django Channels 不提供 websocket 客户端,因为它主要用作服务器。 从你的代码来看,你似乎并不真的需要 Main 和 Worker 之间的 websocket 通信,因为你只是启动一个套接字,发送一条消息,接收响应并关闭套接字。这是常规 HTTP 的经典用例,因此如果您真的不需要保持连接有效,我建议您改用常规 HTTP 端点并使用 aioHTTP 作为客户端。
但是,如果您确实需要客户端,那么您应该在客户端连接时打开一次套接字,并在客户端断开连接时关闭它。你可以这样做。
import websockets
async def create_ws(on_create, on_message):
uri = "wss://localhost:8765"
async with websockets.connect(uri) as websocket:
await on_create(websocket)
while True:
message = await websocket.recv()
if message:
await on_message(message)
class WebsocketClient:
asyn def __init__(self, channel):
self.channel = channel
self.ws = None
await creat_ws(self.on_message)
async def on_create(self, was):
self.ws = ws
async def on_message(self, ws, message):
await self.channel.send(text_data=json.dumps(message)
async def send(self, message):
self.ws.send(message)
asunc def close(self):
self.ws.close()
然后在你的consumer中,你可以使用客户端如下:
class SampleConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.ws_client = WebsocketClient(self)
async def receive(self, text_data):
await self.ws_client.send(text_data)
async def disconnect(self, code):
await self.ws_client.close()
看来我使用我发布的最新想法设法做到了 — 启动一个线程来处理与 Worker 的连接。像这样:
class SampleConsumer(AsyncWebsocketConsumer):
async def receive(self, text_data):
threading.Thread(
target=asyncio.run,
args=(self.talk_to_worker(
url,
{ 'to': 'Worker' },
),)
).start()
async def talk_to_worker(self, url, message):
async with websockets.connect(url) as worker_ws:
await worker_ws.send(json.dumps(message))
result = json.loads(await worker_ws.recv())
await self.send(text_data=json.dumps({ 'to': 'Client' })
在每个方向上使用 HTTP 请求实际上可能更聪明(因为两个端点都可以是 HTTP 服务器),但这似乎可行。
每当我从另一个 WebSocket 服务器收到消息时,我都想在我的 Django 应用程序中处理消息。
我想到了使用 Django 论坛上的 WebSockets client library and keeping it running as a separate process using the manage.py
command from this post。
您可以定义一个异步协程client(websocket_url)
来监听从 WebSocket 服务器接收到的消息。
import asyncio
import websockets
async def client(websocket_url):
async for websocket in websockets.connect(uri):
print("Connected to Websocket server")
try:
async for message in websocket:
# Process message received on the connection.
print(message)
except websockets.ConnectionClosed:
print("Connection lost! Retrying..")
continue #continue will retry websocket connection by exponential back off
在上面的代码中,connect()
充当无限异步迭代器。更多关于 here.
你可以运行自定义管理命令class.
的handle()
方法里面的上述协程
runwsclient.py
from django.core.management.base import BaseCommand
class Command(BaseCommand):
def handle(self, *args, **options):
URL = "ws://example.com/messages"
print(f"Connecting to websocket server {URL}")
asyncio.run(client(URL))
最后,运行 manage.py 命令。
python manage.py runwsclient
您还可以将处理程序传递给 client(ws_url, msg_handler)
,它将处理消息,以便处理逻辑保留在客户端之外。
2022 年 5 月 31 日更新:
我创建了一个 django 包以将上述功能与最小设置集成在一起:django-websocketclient