Django 通道:WebSocket 消息未在生产中发送
Django Channels: WebSocket messages are not sent in production
我有 Django 服务器,它使用 WebSockets 向 Web 客户端发送实时更新。这在本地 运行 完全没问题(使用 manage.py runserver
),但在生产中我 运行 遇到了大多数消息根本没有发送的问题。我通过打开两个浏览器来测试这个,在一个浏览器中进行更改,然后应该在另一个浏览器中反映出来。就像我说的,这一切都在本地工作,但不适用于生产。在生产中 一些 WebSocket 消息由服务器发送并由 Web 客户端接收,但可能有 20% 左右?剩下的就是根本没发
# /websockets/__init__.py
import logging
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from djangorestframework_camel_case.util import camelize
logger = logging.getLogger("django.server.ws.critical-notes")
def ws_send(model_type, action, model_data, user_ids):
logger.info(f"Called ws_send for model {model_type}, action {action}, user_ids: {user_ids}")
channel_layer = get_channel_layer()
for user_id in user_ids:
group_name = f"user-{user_id}"
async_to_sync(channel_layer.group_send)(
group_name,
{
"type": "send.data", # this calls Consumer.send_data
"data": {"type": model_type, "action": action, "model": camelize(model_data)},
},
)
# /websockets/consumers.py
import logging
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from channels.db import database_sync_to_async
from django.db import close_old_connections
from knox.auth import TokenAuthentication
logger = logging.getLogger("django.server.ws.critical-notes")
class Consumer(AsyncJsonWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(args, kwargs)
self.group_name = None
@database_sync_to_async
def get_user(self, token):
try:
auth = TokenAuthentication()
user, auth_token = auth.authenticate_credentials(token.encode("utf-8"))
return user
except Exception as e:
logger.warning("User not found:", e)
return None
# User can authenticate by sending a JSON message containing a token
async def receive_json(self, content, **kwargs):
token = content["token"]
if token is None:
return
user = await self.get_user(token)
close_old_connections()
if user is not None:
self.group_name = f"user-{user.pk}"
logger.info(f"WS connected to {self.group_name}")
await self.channel_layer.group_add(self.group_name, self.channel_name)
# This gets called by /websockets/__init__.py sending an event with type "send.data"
async def send_data(self, event):
data = event["data"]
logger.info(f"Sending event to WS user {self.group_name}: {data}")
await self.send_json(content=data)
# Clean up on disconnect
async def disconnect(self, close_code):
if self.group_name is not None:
logger.info(f"WS disconnected: {self.group_name}")
await self.channel_layer.group_discard(self.group_name, self.channel_name)
await self.close()
# /websockets/routing.py
from django.urls import re_path
from .consumers import Consumer
websocket_urlpatterns = [
re_path(r"ws", Consumer.as_asgi()),
]
有了这一切,我就可以在我的 Django 视图和模型中做这样的事情了:
from websockets import ws_send
ws_send("model_name", "create", data, user_ids)
让这些 user_ids 知道使用数据创建了某个模型。
当我查看服务器日志时,我可以看到 Called ws_send for model
一直被记录,但是 Sending event to WS user
日志消息丢失了。
一些服务器设置信息:
# settings.py
ASGI_APPLICATION = "criticalnotes.asgi.application"
WSGI_APPLICATION = "criticalnotes.wsgi.application"
CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}}
# asgi.py
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "criticalnotes.settings")
django.setup()
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
import criticalnotes.websockets.routing
application = ProtocolTypeRouter(
{
"http": get_asgi_application(),
"websocket": URLRouter(criticalnotes.websockets.routing.websocket_urlpatterns),
}
)
我在服务器上使用带有 uvicorn 的 Nginx:
# /etc/systemd/system/api.critical-notes.com.service
[Unit]
Description=api.critical-notes.com
[Service]
User=criticalnotes
Group=criticalnotes
Environment="PYTHONPATH=/opt/api"
ExecStart=/opt/api/env/bin/uvicorn criticalnotes.asgi:application --log-level warning --workers 4 --uds /tmp/uvicorn.sock
[Install]
WantedBy=multi-user.target
# /etc/nginx/sites-enabled/api.critical-notes.com
server {
server_name api.critical-notes.com;
root /var/www/api.critical-notes.com;
location / {
proxy_set_header Host $http_host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_redirect off;
proxy_buffering off;
proxy_pass http://uvicorn;
}
listen 443 ssl; # managed by Certbot
ssl_certificate /etc/letsencrypt/live/api.critical-notes.com/fullchain.pem; # managed by Certbot
ssl_certificate_key /etc/letsencrypt/live/api.critical-notes.com/privkey.pem; # managed by Certbot
include /etc/letsencrypt/options-ssl-nginx.conf; # managed by Certbot
ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem; # managed by Certbot
}
server {
if ($host = api.critical-notes.com) {
return 301 https://$host$request_uri;
} # managed by Certbot
server_name api.critical-notes.com;
listen 80;
return 404; # managed by Certbot
}
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
upstream uvicorn {
server unix:/tmp/uvicorn.sock;
}
我很茫然,我不明白为什么一切都在本地完美运行,但在生产环境中却不行。这对我的用户来说是个大问题:实时更新在大多数时候根本不起作用:(
版本信息:频道 3.0.4 和 Django 3.2.8。
您需要添加新位置以在 nginx 配置中为您的 websocket 资源提供服务。将您的消费路线更改为 /ws/updates
.
location /ws/ {
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header Host $http_host;
proxy_pass http://uvicorn;
}
这样您就可以在 /ws/updates
访问消费者更新。您的 api 将在 /
提供服务,而您的 websocket 将在 /ws
.
提供服务
原来我真的需要切换到 channels_redis.core.RedisChannelLayer
作为频道后端,而不是使用 InMemoryChannelLayer
。
我有 Django 服务器,它使用 WebSockets 向 Web 客户端发送实时更新。这在本地 运行 完全没问题(使用 manage.py runserver
),但在生产中我 运行 遇到了大多数消息根本没有发送的问题。我通过打开两个浏览器来测试这个,在一个浏览器中进行更改,然后应该在另一个浏览器中反映出来。就像我说的,这一切都在本地工作,但不适用于生产。在生产中 一些 WebSocket 消息由服务器发送并由 Web 客户端接收,但可能有 20% 左右?剩下的就是根本没发
# /websockets/__init__.py
import logging
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from djangorestframework_camel_case.util import camelize
logger = logging.getLogger("django.server.ws.critical-notes")
def ws_send(model_type, action, model_data, user_ids):
logger.info(f"Called ws_send for model {model_type}, action {action}, user_ids: {user_ids}")
channel_layer = get_channel_layer()
for user_id in user_ids:
group_name = f"user-{user_id}"
async_to_sync(channel_layer.group_send)(
group_name,
{
"type": "send.data", # this calls Consumer.send_data
"data": {"type": model_type, "action": action, "model": camelize(model_data)},
},
)
# /websockets/consumers.py
import logging
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from channels.db import database_sync_to_async
from django.db import close_old_connections
from knox.auth import TokenAuthentication
logger = logging.getLogger("django.server.ws.critical-notes")
class Consumer(AsyncJsonWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(args, kwargs)
self.group_name = None
@database_sync_to_async
def get_user(self, token):
try:
auth = TokenAuthentication()
user, auth_token = auth.authenticate_credentials(token.encode("utf-8"))
return user
except Exception as e:
logger.warning("User not found:", e)
return None
# User can authenticate by sending a JSON message containing a token
async def receive_json(self, content, **kwargs):
token = content["token"]
if token is None:
return
user = await self.get_user(token)
close_old_connections()
if user is not None:
self.group_name = f"user-{user.pk}"
logger.info(f"WS connected to {self.group_name}")
await self.channel_layer.group_add(self.group_name, self.channel_name)
# This gets called by /websockets/__init__.py sending an event with type "send.data"
async def send_data(self, event):
data = event["data"]
logger.info(f"Sending event to WS user {self.group_name}: {data}")
await self.send_json(content=data)
# Clean up on disconnect
async def disconnect(self, close_code):
if self.group_name is not None:
logger.info(f"WS disconnected: {self.group_name}")
await self.channel_layer.group_discard(self.group_name, self.channel_name)
await self.close()
# /websockets/routing.py
from django.urls import re_path
from .consumers import Consumer
websocket_urlpatterns = [
re_path(r"ws", Consumer.as_asgi()),
]
有了这一切,我就可以在我的 Django 视图和模型中做这样的事情了:
from websockets import ws_send
ws_send("model_name", "create", data, user_ids)
让这些 user_ids 知道使用数据创建了某个模型。
当我查看服务器日志时,我可以看到 Called ws_send for model
一直被记录,但是 Sending event to WS user
日志消息丢失了。
一些服务器设置信息:
# settings.py
ASGI_APPLICATION = "criticalnotes.asgi.application"
WSGI_APPLICATION = "criticalnotes.wsgi.application"
CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}}
# asgi.py
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "criticalnotes.settings")
django.setup()
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
import criticalnotes.websockets.routing
application = ProtocolTypeRouter(
{
"http": get_asgi_application(),
"websocket": URLRouter(criticalnotes.websockets.routing.websocket_urlpatterns),
}
)
我在服务器上使用带有 uvicorn 的 Nginx:
# /etc/systemd/system/api.critical-notes.com.service
[Unit]
Description=api.critical-notes.com
[Service]
User=criticalnotes
Group=criticalnotes
Environment="PYTHONPATH=/opt/api"
ExecStart=/opt/api/env/bin/uvicorn criticalnotes.asgi:application --log-level warning --workers 4 --uds /tmp/uvicorn.sock
[Install]
WantedBy=multi-user.target
# /etc/nginx/sites-enabled/api.critical-notes.com
server {
server_name api.critical-notes.com;
root /var/www/api.critical-notes.com;
location / {
proxy_set_header Host $http_host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_redirect off;
proxy_buffering off;
proxy_pass http://uvicorn;
}
listen 443 ssl; # managed by Certbot
ssl_certificate /etc/letsencrypt/live/api.critical-notes.com/fullchain.pem; # managed by Certbot
ssl_certificate_key /etc/letsencrypt/live/api.critical-notes.com/privkey.pem; # managed by Certbot
include /etc/letsencrypt/options-ssl-nginx.conf; # managed by Certbot
ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem; # managed by Certbot
}
server {
if ($host = api.critical-notes.com) {
return 301 https://$host$request_uri;
} # managed by Certbot
server_name api.critical-notes.com;
listen 80;
return 404; # managed by Certbot
}
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
upstream uvicorn {
server unix:/tmp/uvicorn.sock;
}
我很茫然,我不明白为什么一切都在本地完美运行,但在生产环境中却不行。这对我的用户来说是个大问题:实时更新在大多数时候根本不起作用:(
版本信息:频道 3.0.4 和 Django 3.2.8。
您需要添加新位置以在 nginx 配置中为您的 websocket 资源提供服务。将您的消费路线更改为 /ws/updates
.
location /ws/ {
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header Host $http_host;
proxy_pass http://uvicorn;
}
这样您就可以在 /ws/updates
访问消费者更新。您的 api 将在 /
提供服务,而您的 websocket 将在 /ws
.
原来我真的需要切换到 channels_redis.core.RedisChannelLayer
作为频道后端,而不是使用 InMemoryChannelLayer
。