使用 django-channels 向单个用户发送消息

Sending a message to a single user using django-channels

我一直在尝试 django-channels,包括阅读文档和尝试示例。

我希望能够通过将新实例保存到数据库来向单个用户发送消息。

我的用例是创建一个新通知(通过 celery 任务),一旦保存通知,就将此通知发送给单个用户。

这听起来很有可能(来自 django-channels docs

...the crucial part is that you can run code (and so send on channels) in response to any event - and that includes ones you create. You can trigger on model saves, on other incoming messages, or from code paths inside views and forms.

然而,进一步阅读文档并尝试使用 django-channels examples,我看不出如何做到这一点。数据绑定和 liveblog 示例演示了发送到群组,但我看不出如何只发送到单个用户。

最好的方法是为该特定用户创建组。当 ws_connect 您可以将该用户添加到 Group("%s" % <user>).add(message.reply_channel)

Note: My websocket url is ws://127.0.0.1:8000/<user>

扩展@Flip 为该特定用户创建群组的回答。

在您的 python 函数中,在您的 ws_connect 函数中,您可以将该用户添加到一个专门为他们设立的组中:

consumers.py

from channels.auth import channel_session_user_from_http
from channels import Group

@channel_session_user_from_http
def ws_connect(message):
    if user.is_authenticated:
        Group("user-{}".format(user.id)).add(message.reply_channel)

要使用您的 python 代码向该用户发送消息:

我的view.py

import json
from channels import Group

def foo(user):
    if user.is_authenticated:
        Group("user-{}".format(user.id)).send({
            "text": json.dumps({
            "foo": 'bar'
        })
    })

如果他们已连接,他们将收到消息。如果用户未连接到 websocket,它将静默失败。

您还需要确保只将一个用户连接到每个用户的组,否则多个用户可能会收到一条消息,而您只打算为特定用户发送一条消息。

查看 django 通道示例,特别是 multichat 如何实现路由、在客户端创建 websocket 连接和设置 django_channels。

请确保您也查看了 django channels docs

只是为了扩展@luke_aus 的答案,如果您正在使用 ResourceBindings,您也可以这样做,只有用户 "owning" 一个对象检索这些更新:

就像@luke_aus 的回答一样,我们将用户注册到它自己的组中,我们可以在其中发布只能对该用户可见的操作(updatecreate)等:

from channels.auth import channel_session_user_from_http,
from channels import Group

@channel_session_user_from_http
def ws_connect(message):
    Group("user-%s" % message.user).add(message.reply_channel)

现在我们可以更改相应的绑定,使其仅在绑定对象属于该用户时才发布更改,假设模型如下:

class SomeUserOwnedObject(models.Model):
    owner = models.ForeignKey(User)

现在我们可以将此模型绑定到我们的用户组,所有操作(更新、创建等)将只发布给这个用户:

class SomeUserOwnedObjectBinding(ResourceBinding):
    # your binding might look like this:
    model = SomeUserOwnedObject
    stream = 'someuserownedobject'
    serializer_class = SomeUserOwnedObjectSerializer
    queryset = SomeUserOwnedObject.objects.all()

    # here's the magic to only publish to this user's group
    @classmethod
    def group_names(cls, instance, action):
        # note that this will also override all other model bindings
        # like `someuserownedobject-update` `someuserownedobject-create` etc
        return ['user-%s' % instance.owner.pk]

几乎没有更新,因为组在通道 2 中的工作方式与通道 1 中的不同。不再有组 class,如前所述 here

记录了新组 API here. See also here

对我有用的是:

# Required for channel communication
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync


def send_channel_message(group_name, message):
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        '{}'.format(group_name),
        {
            'type': 'channel_message',
            'message': message
        }
    )

不要忘记在消费者中定义一个方法来处理消息类型!

    # Receive message from the group
    def channel_message(self, event):
        message = event['message']

        # Send message to WebSocket
        self.send(text_data=json.dumps({
            'message': message
        }))

频道 2 中,您可以将 self.channel_name 保存在 db on connect 方法中,该方法是每个用户的特定哈希。 Documentation here

from asgiref.sync import async_to_sync
from channels.generic.websocket import AsyncJsonWebsocketConsumer
import json

class Consumer(AsyncJsonWebsocketConsumer):
    async def connect(self):
        self.room_group_name = 'room'

        if self.scope["user"].is_anonymous:
            # Reject the connection
            await self.close()
        else:
            # Accept the connection
            await self.channel_layer.group_add(
                self.room_group_name,
                self.channel_name
            )

            await self.accept()

        print( self.channel_name )

最后一行 returns 类似于 specific.WxuYsxLK!owndoeYTkLBw

您可以将此特定哈希保存在用户的table。

虽然晚了,但我有一个针对通道 2 的直接解决方案,即使用 send 而不是 group_send

send(self, channel, message)
 |      Send a message onto a (general or specific) channel.

用作 -

await self.channel_layer.send(
            self.channel_name,
            {
                'type':'bad_request',
                'user':user.username,
                'message':'Insufficient Amount to Play',
                'status':'400'
            }
        )

交易它 -

await self.send(text_data=json.dumps({
            'type':event['type'],
            'message': event['message'],
            'user': event['user'],
            'status': event['status']
        }))

谢谢