频道根据权限限制消息发送

channels restrict message sending based on permissions

所以我有储物柜模型。

class Locker(CoreModel):
    # PLAN CHOICES
    BASIC = 0
    BEGINNER = 1
    STANDARD = 2
    BUSINESS = 3

    PLAN_CHOICES = (
        (BASIC, 'Basic'),
        (BEGINNER, 'Beginner'),
        (STANDARD, 'Standard'),
        (BUSINESS, 'Business')
    )

    PLAN_MAPPING = {
        settings.BEGINNER_PLAN_ID: BEGINNER,
        settings.STANDARD_PLAN_ID: STANDARD,
        settings.BUSINESS_PLAN_ID: BUSINESS
    }

    SPACE_MAPPING = {
        BASIC: settings.BASIC_MAX_SPACE,
        BEGINNER: settings.BEGINNER_MAX_SPACE,
        STANDARD: settings.STANDARD_MAX_SPACE,
        BUSINESS: settings.BUSINESS_MAX_SPACE
    }

    topic = models.CharField(
        verbose_name=_('topic'),
        help_text=_("locker description"),
        null=True,
        max_length=1024
    )
    icon = models.ImageField(
        verbose_name=_('icon'),
        help_text=_("locker icon"),
        max_length=512,
        validators=(
            MaxFileSizeValidator(
                max_size=settings.MAX_LOCKER_ICON_SIZE
            ),
        ),
        upload_to='icons/',
    )
    name = models.CharField(
        verbose_name=_('name'),
        help_text=_("locker name"),
        max_length=100,
        validators=(
            MinLengthValidator(2),
        )
    )
    owner = models.ForeignKey(
        to=settings.AUTH_USER_MODEL,
        related_name='owned_locker_set',
        verbose_name=_('owner'),
        on_delete=models.CASCADE
    )
    plan = models.SmallIntegerField(
        verbose_name=_('plan'),
        help_text=_('billing plan'),
        default=BASIC,
        choices=PLAN_CHOICES
    )

多个用户可以加入储物柜。

class GatewayEventsConsumer(AsyncWebsocketConsumer):
    """
    An ASGI consumer for gateway event sending.
    """
    # OPCODES FOR GATEWAY
    # COMMANDS AND EVENTS
    DISPATCH = 0
    HEARTBEAT = 1
    IDENTIFY = 2
    READY = 3
    HELLO = 4
    HEARTBEAT_ACK = 5

    def __init__(self):
        super(GatewayEventsConsumer, self).__init__()
        self.connection = None
        self.last_heartbeat = 0
        # handlers for gateway commands
        self.opcode_receivers = {
            self.IDENTIFY: self.identify,
            self.HEARTBEAT: self.heartbeat
        }

    def authenticate_scope(self, token):
        """
        sets the scope's user to the token's
        user object after validating the same.
        We need to do this explicitly without
        and token-authentication-middleware
        because websocket connections with http
        headers are not supported in most cases.
        """
        try:
            token = Token.objects.get(key=token)
            self.scope['user'] = token.user
        except ObjectDoesNotExist:
            raise exceptions.AuthenticationFailed(
                _('invalid token')
            )

    async def connect(self):
        # add the user to their private group. Because
        # we add the user before they identify themselves
        # via the gateway we cannot get the user's id.
        # Hence we use the channel name as a substitute
        # for the private group's unique identifier
        await self.channel_layer.group_add(
            group=self.channel_name,
            channel=self.channel_name
        )
        await self.accept()
        # indicate the user that they
        # have connected to the gateway successfully
        await self.hello()

    async def disconnect(self, close_code: int):
        # delete the connection we created
        # earlier when the client disconnects
        await self.connection.disconnect_buckets()
        await self.channel_layer.group_discard(
            group=self.channel_name,
            channel=self.channel_name
        )
        self.connection.delete()

    async def receive(self, text_data=None, bytes_data=None):
        """
        converts json-data before parsing the
        gateway command issued by the client. Closes
        the connection if any ClientSideException occurs.
        """
        try:
            data = json.loads(text_data)
            await self.parse(data)
        except exceptions.ClientSideError as exc:
            await self.close(exc.close_code)

    async def parse(self, data: dict):
        """
        An helper function which parses
        commands sent by the client. Raises
        a subclass of ClientSideException if
        any exceptions arise.
        """
        payload = data.get('data', None)
        opcode = data.get('opcode', None)
        handler = self.opcode_receivers.get(opcode, None)
        if handler is None:
            raise exceptions.InvalidOPCode(
                _('invalid opcode given')
            )
        if payload is None:
            raise exceptions.InvalidPayload(
                _('empty payload given')
            )
        await handler(data)

    async def dispatch_event(self, event: dict):
        """
        called when sending gateway commands or
        events. These events may not have the event
        name and the `data` object for the same is
        optional. The opcode varies for events.
        """
        opcode = event.get('opcode', self.DISPATCH)
        await self.send(
            text_data=json.dumps(
                {
                    'opcode': opcode,
                    'event': event.get('event', None),
                    'data': event.get('data', None)
                }
            )
        )

    async def heartbeat(self, data: dict):
        """
        touch the connection and send
        back response to indicate that
        we've received the heartbeat
        """
        sequence = data.get('sequence', None)
        if sequence is not self.last_heartbeat:
            raise exceptions.InvalidPayload(
                _('invalid sequence')
            )
        self.last_heartbeat += 1
        self.connection.touch()
        await self.heartbeat_ack()

    async def identify(self, data: dict):
        """
        authenticate the user and provide
        a valid state via ready if the given
        credentials were valid
        """
        user = self.scope.get('user')
        token = data.get('token', None)
        if user.is_authenticated:
            # we cannot authenticate the user once
            # again. Close the session and wait for a
            # fresh identify packet from the client
            raise exceptions.AlreadyAuthenticated(
                _('already authenticated')
            )
        if token is None:
            raise exceptions.InvalidPayload(
                _('token not given')
            )
        self.authenticate_scope(token)
        self.connection = Connection.objects.create(
            # create a connection object for the user
            # with the given channel id. This ensures
            # that we can get rid of stale connections
            # periodically by pruning with celery
            # Users must send heart beats to ensure
            # they don't get disconnected by the gateway
            user=user,
            channel_name=self.channel_name
        )
        await self.connection.connect_buckets()
        await self.ready()

    async def heartbeat_ack(self):
        """
        indicate that we've received
        the user's heartbeat. Clients
        can use this event to detect
        unusual connections
        """
        await self.channel_layer.group_send(
            self.channel_name,
            {
                'type': 'dispatch_event',
                'opcode': self.HEARTBEAT_ACK
            }
        )

    async def ready(self):
        """
        indicate that the client is in a
        connected state and is ready to
        receive gateway events
        """
        await self.channel_layer.group_send(
            self.channel_name,
            {
                'type': 'dispatch_event',
                'opcode': self.READY,
            }
        )

    async def hello(self):
        """
        indicate that the user's
        connection has been accepted
        """
        await self.channel_layer.group_send(
            self.channel_name,
            {
                'type': 'dispatch_event',
                'opcode': self.HELLO,
                'data': {
                    'interval': settings.CONNECTION_MAX_AGE
                }
            }
        )

这是我的消费者class。

这是我的连接模型,代表一个活动的并发连接。

class Connection(CoreModel):
    channel_name = models.CharField(
        verbose_name=_('channel name'),
        help_text=_("event stream channel name"),
        max_length=255,
        unique=True
    )
    user = models.ForeignKey(
        to=settings.AUTH_USER_MODEL,
        verbose_name=_('user'),
        on_delete=models.CASCADE
    )
    last_seen = models.DateTimeField(
        verbose_name=_('last seen'),
        help_text=_("when the last heartbeat was received at"),
        auto_now_add=True
    )

    objects = ConnectionsManager()

    class Meta(CoreModel.Meta):
        verbose_name = _('connection')
        verbose_name_plural = _('connections')

    def __str__(self):
        return self.channel_name

    def touch(self):
        """
        updates the `last_seen` field 
        to the current time.
        """
        self.last_seen = now()
        self.save(update_fields=['last_seen'])

    async def connect_buckets(self):
        """
        helper function to add the connection to
        all the locker-channels the user is in.
        """
        for member in self.user.member_set.all():
            await channel_layer.group_add(
                group=member.locker.id,
                channel=self.channel_name
            )

    async def disconnect_buckets(self):
        """
        helper function to remove the connection
        from all the locker-channels the user is in.
        """
        for member in self.user.member_set.all():
            await channel_layer.group_discard(
                group=member.locker.id,
                channel=self.channel_name
            )

每当用户加入 websocket 框架(连接到它)时,我都会让用户使用各自的储物柜 ID 加入频道(如果用户是储物柜 A 和 B 的一部分,他们将加入频道 'A.id' , 'B.id').

我发送这样的事件。

def on_upload_save(instance=None, **kwargs):
    if kwargs.pop('created', False):
        return async_to_sync(channel_layer.group_send)(
            str(instance.locker.id),
            {
                'type': 'dispatch_event',
                'event': 'UPLOAD_CREATE',
                'data': PartialUploadSerializer(instance).data
            }
        )
    async_to_sync(channel_layer.group_send)(
        str(instance.locker.id),
        {
            'type': 'dispatch_event',
            'event': 'UPLOAD_UPDATE',
            'data': PartialUploadSerializer(instance).data
        }
    )

问题是,在发送这些事件的同时,我还想检查权限。 例如,只有 READ_UPLOADS 权限的用户才能获得 UPLOAD_CREATEUPLOAD_UPDATE 事件。

我有一个关于用户的功能,可以检查用户是否有权限,比如

user.has_permission('UPLOAD_CREATE')

如何实现这样的系统?

在您的 async def dispatch_event(self, event: dict): 方法中,您应该能够从 scope["user"] 访问用户,以便此时可以进行过滤。

或者,您可以为每种类型的消息创建一个群组,然后只订阅允许用户查看的群组。 (这是更好的解决方案)。