Django ASGI 通道应用程序不跟踪多个 websocket 客户端(使用 redis 和 nginx)

Django ASGI channels application not keeping track of multiple websocket clients (using redis and nginx)

编辑:

我在为我的 websockets 定义路由时错过了 .as_asgi() ,这意味着我创建的每个实例都会被我创建的下一个实例覆盖。

我的 routing.py 现在看起来像:

application = ProtocolTypeRouter({
    'websocket': AllowedHostsOriginValidator(
        AuthMiddlewareStack(
            URLRouter(
                [
                    url("calling/interface/", ChatConsumer.as_asgi()),
                ]
            )
        )
    )
})

原问题:

我在本地虚拟机上安装了带有 asgi 频道的 Django,我也在使用 redis-channels 和 nginx。我能够与一个用户连接并使用我的应用程序,但是一旦创建第二个连接,旧的(原始)客户端就会停止接收网络套接字信息,但它仍然可以发送它。我的 consumers.py 收到了所有的 websocket 数据,但认为它们都是来自最新的连接。这意味着第一个 ws 连接关闭但停止接收信息。

版本:

Python = 3.9.2
Django = 3.1.7
Channels = 3.0.3
channels-redis = 3.2.0
redis = 5.0.3

我已经能够从消费者内部和外部发送 websocket 信息来处理 webhook 响应,我只是无法处理消费者的多个连接。

Nginx 配置:

worker_processes  1;

events {
    worker_connections  1024; 
}

http {
    include       mime.types;
    default_type  application/octet-stream;
    sendfile        on;
    keepalive_timeout  65;

    upstream websocket {
        server 127.0.0.1:8000;
    }

    server {
        listen       80;
        server_name  localhost;        

        location / {
            proxy_pass         http://websocket;
            proxy_http_version 1.1;
            proxy_set_header   Upgrade $http_upgrade;
            proxy_set_header   Connection "upgrade";
    
        proxy_redirect off;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Formwarded-Host $server_name;
        
        } # end location
    } # end server
} # end http

消费者(减少):

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.username = self.scope["session"]["tokenOwner"]

        await self.accept()
        await self.send(text_data="[Welcome %s!]" % self.username)
        token = self.scope["session"]["token"]
        webhook_response = await create_webhook()

        await create_record(webhook_response.id)

        print(f"New Channel: {self.channel_name}")
        print(f"Webhook ID: {webhook_response.id}")

    

    async def disconnect(self, message):
        token = self.scope["session"]["token"]
        closing_channel_name = self.channel_name

        WebhookID = await get_self_record()

        await asyncio.gather(delete_webhook(WebhookID))
        await delete_record()


    async def receive(self, text_data=None, bytes_data=None):
    
        token = self.scope["session"]["token"]
    
        instructions = text_data.split("_")
        params = instructions[1:]
    
        #instructions are send as a string with data split by '_'
        #I handle the args sent and call a function based on them

    async def call_update(self, event):
       await self.send(text_data=event['text'])

如果您需要更多信息,请告诉我!

我在为我的 websockets 定义路由时错过了 .as_asgi(),这意味着我创建的每个实例都被我创建的下一个实例覆盖。

我的 routing.py 现在看起来像:

application = ProtocolTypeRouter({
    'websocket': AllowedHostsOriginValidator(
        AuthMiddlewareStack(
            URLRouter(
                [
                    url("calling/interface/", ChatConsumer.as_asgi()),
                ]
            )
        )
    )
})