Django-channels:ChatConsumer 只向一个用户发送消息,而不是向两个用户发送消息

Django-channels : ChatConsumer is sending message to only one user instead of sending it to both users

我正在使用 django 和 angular 使用 django-channels 和 redis 实现聊天应用程序。

套接字已连接并正常工作,但我面临的问题是,当两个用户在线并使用同一线程连接同一个聊天室时 url 它会连接,但任何用户发送的消息仅发送给最近连接套接字的用户,它们只发送两次给该用户。

在django中我做了如下配置:

settings/base.py

....
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.sites',

    'channels',
    'chats'
]

ASGI_APPLICATION = "influnite.routing.application"

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("localhost", 6379)],
            # "hosts": [os.environ.get('REDIS_URL', 'redis://localhost:6379')]
        },
    },
}
....

routing.py

from django.conf.urls import url
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from channels.security.websocket import AllowedHostsOriginValidator, OriginValidator

from chats.consumers import ChatConsumer

application = ProtocolTypeRouter({
    'websocket': AllowedHostsOriginValidator(
        AuthMiddlewareStack(
            URLRouter(
                [
                    url(r"^messages/(?P<thread_id>[\w.+]+)/", ChatConsumer())
                ]
            )
        )
    )
})

我创建了三个模型,即 ThreadThreadMemberChatMessage

chats/models.py

from django.db import models
from django.db.models import Q
from django.utils import timezone
from django.contrib.auth.models import User

from base.models import BaseModel

# Create your models here.
MESSAGE_TYPE = [
    ('text', 'Text'),
    ('audio', 'Audio'),
    ('img', 'Image'),
    ('doc', 'Document'),
    ('link', 'Link')
]

THREAD_TYPE = [
    ('individual', 'Individual'),
    ('group', 'Group')
]

class Thread(BaseModel):
    name = models.CharField(max_length=20, null=True, blank=True)
    timestamp = models.DateTimeField(auto_now_add=True)
    thread_type = models.CharField(max_length=20, choices=THREAD_TYPE, default='individual')

    class Meta:
        db_table = 'in_thread'
        verbose_name = 'threads'
        verbose_name_plural = 'thread'
        ordering = ['-update_date']

class ThreadMember(BaseModel):
    thread = models.ForeignKey(Thread, on_delete=models.CASCADE, related_name='thread_member')
    user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='thread_member_user')
    is_grp_admin = models.BooleanField(default=False)

    def __str__(self):
        return f'{self.thread.name} > {self.user}'

    class Meta:
        db_table = 'in_thread_member'
        verbose_name = 'thread members'
        verbose_name_plural = 'thread member'

class ChatMessage(BaseModel):
    thread = models.ForeignKey(Thread, on_delete=models.CASCADE, related_name='msg_thread')
    sender = models.ForeignKey(ThreadMember, on_delete=models.CASCADE, related_name='msg_sender')
    message = models.TextField(null=True, blank=True)
    sent_at = models.DateTimeField(default=timezone.now())
    read_receipt = models.BooleanField(default=False)
    msg_type = models.CharField(max_length=20, choices=MESSAGE_TYPE, default='text')

    def __str__(self):
        return f'{self.sender} > {self.message}'

    class Meta:
        db_table = 'in_chat_message'
        verbose_name = 'chat message'
        verbose_name_plural = 'chat messages'
        ordering = ['sent_at']

下面是 consumers.py 文件,其中包含 ChatConsumer class。

chats/consumers.py

from django.contrib.auth.models import User

import asyncio, json
from channels.consumer import AsyncConsumer
from channels.db import database_sync_to_async

from .models import Thread, ThreadMember, ChatMessage
from .serializers import ChatMessageSerializer

class ChatConsumer(AsyncConsumer):
    async def websocket_connect(self, event):
        print("connected", event)
        try:
            kwargs = self.scope['url_route']['kwargs']

            thread_id = kwargs.get('thread_id', False)
            if thread_id:
                thread = await self.get_thread(thread_id)
                if thread:
                    self.chat_room = f'thread_{thread_id}'
                    await self.channel_layer.group_add(
                        self.chat_room,
                        self.channel_name
                    )
                    await self.send({
                        "type": "websocket.accept"
                    })
            else:
                await self.send({
                    "type": "websocket.close"
                })
        except Exception as e:
            print("Error in websocket connection!")
            print(e)

    async def websocket_receive(self, event):
        print("receive", event)
        try:
            kwargs = self.scope['url_route']['kwargs']
            
            thread_id = kwargs.get('thread_id', False)
            thread = await self.get_thread(thread_id)

            response = event.get('text', False)
            response = json.loads(response)
            message = response.get('message', False)

            if message:
                data, message_saved = await self.save_message(
                    message, response.get('sender'), thread)

                if message_saved:
                    text = json.dumps(response)

                    if thread:
                        await self.channel_layer.group_send(
                            self.chat_room,
                            {
                                "type": "chat_message",
                                "text": text
                            }
                        )
        except Exception as e:
            print("Error in websocket receive!")
            print(e)
    
    async def websocket_disconnect(self, event):
        print("disconnected", event)

    async def chat_message(self, event):
        """sends the actual message"""
        try:
            await self.send({
                "type": "websocket.send",
                "text": event['text']
            })
        except Exception as e:
            print("Error sending messages!")
            print(e)

    @database_sync_to_async
    def get_thread(self, thread_id):
        return Thread.objects.get(id=thread_id)
    
    @database_sync_to_async
    def save_message(self, message, sender, thread):
        try:
            sender = ThreadMember.objects.get(
                thread=thread.id,
                user=User.objects.get(id=sender))
            chat = ChatMessage.objects.create(
                thread=thread,
                sender=sender,
                message=message
            )
            chat.save()
            thread.save()
            return ChatMessageSerializer(chat).data, True
        except Exception as e:
            print("Error saving chat!")
            print(e)
            return False

当我 运行 redis 服务器时,我得到以下信息。

C:\Users\rh>cd C:\Program Files\Redis
C:\Program Files\Redis>redis-server redis.windows.conf
                _._
           _.-``__ ''-._
      _.-``    `.  `_.  ''-._           Redis 5.0.10 (1c047b68/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 9628
  `-._    `-._  `-./  _.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |           http://redis.io
  `-._    `-._`-.__.-'_.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |
  `-._    `-._`-.__.-'_.-'    _.-'
      `-._    `-.__.-'    _.-'
          `-._        _.-'
              `-.__.-'

[9628] 11 Dec 12:38:17.011 # Server initialized
[9628] 11 Dec 12:38:17.011 * DB loaded from disk: 0.000 seconds
[9628] 11 Dec 12:38:17.011 * Ready to accept connections

我不知道自己做错了什么,感谢您的帮助。如果需要更多信息,我会改进我的问题。

提前致谢!

试试这个:

  1. django.core.asgi 导入 get_asgi_application
from django.core.asgi import get_asgi_application
  1. 允许 Django 的 ASGI 应用程序处理传统的 http 请求,方法是在 ProtocolTypeRouter.[= 中的字典中添加 http 作为键和 get_asgi_application() 作为键的值36=]
application = ProtocolTypeRouter({
   "http": get_asgi_application(),
   ....
})
  1. use/call as_asgi() 类方法同时路由 ChatConsumer 消费者。
url(
   r"^messages/(?P<username>[\w.@+-]+)/$",
   ChatConsumer.as_asgi()
),

routing.py

from django.conf.urls import url
from django.core.asgi import get_asgi_application

from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from channels.security.websocket import (
     AllowedHostsOriginValidator,
     OriginValidator
)

from chat.consumers import ChatConsumer

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AllowedHostsOriginValidator(
        AuthMiddlewareStack(
            URLRouter([
                url(
                   r"^messages/(?P<username>[\w.@+-]+)/$",
                   ChatConsumer.as_asgi()
                ),
           ])
        )
    )
})

有关 channels.readthedocs.io 的更多信息。

就我而言,我需要将 channels 更新到至少 3.0.1(由于 https://github.com/django/channels/issues/1550)。