频道根据权限限制消息发送
channels restrict message sending based on permissions
所以我有储物柜模型。
class Locker(CoreModel):
# PLAN CHOICES
BASIC = 0
BEGINNER = 1
STANDARD = 2
BUSINESS = 3
PLAN_CHOICES = (
(BASIC, 'Basic'),
(BEGINNER, 'Beginner'),
(STANDARD, 'Standard'),
(BUSINESS, 'Business')
)
PLAN_MAPPING = {
settings.BEGINNER_PLAN_ID: BEGINNER,
settings.STANDARD_PLAN_ID: STANDARD,
settings.BUSINESS_PLAN_ID: BUSINESS
}
SPACE_MAPPING = {
BASIC: settings.BASIC_MAX_SPACE,
BEGINNER: settings.BEGINNER_MAX_SPACE,
STANDARD: settings.STANDARD_MAX_SPACE,
BUSINESS: settings.BUSINESS_MAX_SPACE
}
topic = models.CharField(
verbose_name=_('topic'),
help_text=_("locker description"),
null=True,
max_length=1024
)
icon = models.ImageField(
verbose_name=_('icon'),
help_text=_("locker icon"),
max_length=512,
validators=(
MaxFileSizeValidator(
max_size=settings.MAX_LOCKER_ICON_SIZE
),
),
upload_to='icons/',
)
name = models.CharField(
verbose_name=_('name'),
help_text=_("locker name"),
max_length=100,
validators=(
MinLengthValidator(2),
)
)
owner = models.ForeignKey(
to=settings.AUTH_USER_MODEL,
related_name='owned_locker_set',
verbose_name=_('owner'),
on_delete=models.CASCADE
)
plan = models.SmallIntegerField(
verbose_name=_('plan'),
help_text=_('billing plan'),
default=BASIC,
choices=PLAN_CHOICES
)
多个用户可以加入储物柜。
class GatewayEventsConsumer(AsyncWebsocketConsumer):
"""
An ASGI consumer for gateway event sending.
"""
# OPCODES FOR GATEWAY
# COMMANDS AND EVENTS
DISPATCH = 0
HEARTBEAT = 1
IDENTIFY = 2
READY = 3
HELLO = 4
HEARTBEAT_ACK = 5
def __init__(self):
super(GatewayEventsConsumer, self).__init__()
self.connection = None
self.last_heartbeat = 0
# handlers for gateway commands
self.opcode_receivers = {
self.IDENTIFY: self.identify,
self.HEARTBEAT: self.heartbeat
}
def authenticate_scope(self, token):
"""
sets the scope's user to the token's
user object after validating the same.
We need to do this explicitly without
and token-authentication-middleware
because websocket connections with http
headers are not supported in most cases.
"""
try:
token = Token.objects.get(key=token)
self.scope['user'] = token.user
except ObjectDoesNotExist:
raise exceptions.AuthenticationFailed(
_('invalid token')
)
async def connect(self):
# add the user to their private group. Because
# we add the user before they identify themselves
# via the gateway we cannot get the user's id.
# Hence we use the channel name as a substitute
# for the private group's unique identifier
await self.channel_layer.group_add(
group=self.channel_name,
channel=self.channel_name
)
await self.accept()
# indicate the user that they
# have connected to the gateway successfully
await self.hello()
async def disconnect(self, close_code: int):
# delete the connection we created
# earlier when the client disconnects
await self.connection.disconnect_buckets()
await self.channel_layer.group_discard(
group=self.channel_name,
channel=self.channel_name
)
self.connection.delete()
async def receive(self, text_data=None, bytes_data=None):
"""
converts json-data before parsing the
gateway command issued by the client. Closes
the connection if any ClientSideException occurs.
"""
try:
data = json.loads(text_data)
await self.parse(data)
except exceptions.ClientSideError as exc:
await self.close(exc.close_code)
async def parse(self, data: dict):
"""
An helper function which parses
commands sent by the client. Raises
a subclass of ClientSideException if
any exceptions arise.
"""
payload = data.get('data', None)
opcode = data.get('opcode', None)
handler = self.opcode_receivers.get(opcode, None)
if handler is None:
raise exceptions.InvalidOPCode(
_('invalid opcode given')
)
if payload is None:
raise exceptions.InvalidPayload(
_('empty payload given')
)
await handler(data)
async def dispatch_event(self, event: dict):
"""
called when sending gateway commands or
events. These events may not have the event
name and the `data` object for the same is
optional. The opcode varies for events.
"""
opcode = event.get('opcode', self.DISPATCH)
await self.send(
text_data=json.dumps(
{
'opcode': opcode,
'event': event.get('event', None),
'data': event.get('data', None)
}
)
)
async def heartbeat(self, data: dict):
"""
touch the connection and send
back response to indicate that
we've received the heartbeat
"""
sequence = data.get('sequence', None)
if sequence is not self.last_heartbeat:
raise exceptions.InvalidPayload(
_('invalid sequence')
)
self.last_heartbeat += 1
self.connection.touch()
await self.heartbeat_ack()
async def identify(self, data: dict):
"""
authenticate the user and provide
a valid state via ready if the given
credentials were valid
"""
user = self.scope.get('user')
token = data.get('token', None)
if user.is_authenticated:
# we cannot authenticate the user once
# again. Close the session and wait for a
# fresh identify packet from the client
raise exceptions.AlreadyAuthenticated(
_('already authenticated')
)
if token is None:
raise exceptions.InvalidPayload(
_('token not given')
)
self.authenticate_scope(token)
self.connection = Connection.objects.create(
# create a connection object for the user
# with the given channel id. This ensures
# that we can get rid of stale connections
# periodically by pruning with celery
# Users must send heart beats to ensure
# they don't get disconnected by the gateway
user=user,
channel_name=self.channel_name
)
await self.connection.connect_buckets()
await self.ready()
async def heartbeat_ack(self):
"""
indicate that we've received
the user's heartbeat. Clients
can use this event to detect
unusual connections
"""
await self.channel_layer.group_send(
self.channel_name,
{
'type': 'dispatch_event',
'opcode': self.HEARTBEAT_ACK
}
)
async def ready(self):
"""
indicate that the client is in a
connected state and is ready to
receive gateway events
"""
await self.channel_layer.group_send(
self.channel_name,
{
'type': 'dispatch_event',
'opcode': self.READY,
}
)
async def hello(self):
"""
indicate that the user's
connection has been accepted
"""
await self.channel_layer.group_send(
self.channel_name,
{
'type': 'dispatch_event',
'opcode': self.HELLO,
'data': {
'interval': settings.CONNECTION_MAX_AGE
}
}
)
这是我的消费者class。
这是我的连接模型,代表一个活动的并发连接。
class Connection(CoreModel):
channel_name = models.CharField(
verbose_name=_('channel name'),
help_text=_("event stream channel name"),
max_length=255,
unique=True
)
user = models.ForeignKey(
to=settings.AUTH_USER_MODEL,
verbose_name=_('user'),
on_delete=models.CASCADE
)
last_seen = models.DateTimeField(
verbose_name=_('last seen'),
help_text=_("when the last heartbeat was received at"),
auto_now_add=True
)
objects = ConnectionsManager()
class Meta(CoreModel.Meta):
verbose_name = _('connection')
verbose_name_plural = _('connections')
def __str__(self):
return self.channel_name
def touch(self):
"""
updates the `last_seen` field
to the current time.
"""
self.last_seen = now()
self.save(update_fields=['last_seen'])
async def connect_buckets(self):
"""
helper function to add the connection to
all the locker-channels the user is in.
"""
for member in self.user.member_set.all():
await channel_layer.group_add(
group=member.locker.id,
channel=self.channel_name
)
async def disconnect_buckets(self):
"""
helper function to remove the connection
from all the locker-channels the user is in.
"""
for member in self.user.member_set.all():
await channel_layer.group_discard(
group=member.locker.id,
channel=self.channel_name
)
每当用户加入 websocket 框架(连接到它)时,我都会让用户使用各自的储物柜 ID 加入频道(如果用户是储物柜 A 和 B 的一部分,他们将加入频道 'A.id' , 'B.id').
我发送这样的事件。
def on_upload_save(instance=None, **kwargs):
if kwargs.pop('created', False):
return async_to_sync(channel_layer.group_send)(
str(instance.locker.id),
{
'type': 'dispatch_event',
'event': 'UPLOAD_CREATE',
'data': PartialUploadSerializer(instance).data
}
)
async_to_sync(channel_layer.group_send)(
str(instance.locker.id),
{
'type': 'dispatch_event',
'event': 'UPLOAD_UPDATE',
'data': PartialUploadSerializer(instance).data
}
)
问题是,在发送这些事件的同时,我还想检查权限。
例如,只有 READ_UPLOADS
权限的用户才能获得 UPLOAD_CREATE
和 UPLOAD_UPDATE
事件。
我有一个关于用户的功能,可以检查用户是否有权限,比如
user.has_permission('UPLOAD_CREATE')
如何实现这样的系统?
在您的 async def dispatch_event(self, event: dict):
方法中,您应该能够从 scope["user"]
访问用户,以便此时可以进行过滤。
或者,您可以为每种类型的消息创建一个群组,然后只订阅允许用户查看的群组。 (这是更好的解决方案)。
所以我有储物柜模型。
class Locker(CoreModel):
# PLAN CHOICES
BASIC = 0
BEGINNER = 1
STANDARD = 2
BUSINESS = 3
PLAN_CHOICES = (
(BASIC, 'Basic'),
(BEGINNER, 'Beginner'),
(STANDARD, 'Standard'),
(BUSINESS, 'Business')
)
PLAN_MAPPING = {
settings.BEGINNER_PLAN_ID: BEGINNER,
settings.STANDARD_PLAN_ID: STANDARD,
settings.BUSINESS_PLAN_ID: BUSINESS
}
SPACE_MAPPING = {
BASIC: settings.BASIC_MAX_SPACE,
BEGINNER: settings.BEGINNER_MAX_SPACE,
STANDARD: settings.STANDARD_MAX_SPACE,
BUSINESS: settings.BUSINESS_MAX_SPACE
}
topic = models.CharField(
verbose_name=_('topic'),
help_text=_("locker description"),
null=True,
max_length=1024
)
icon = models.ImageField(
verbose_name=_('icon'),
help_text=_("locker icon"),
max_length=512,
validators=(
MaxFileSizeValidator(
max_size=settings.MAX_LOCKER_ICON_SIZE
),
),
upload_to='icons/',
)
name = models.CharField(
verbose_name=_('name'),
help_text=_("locker name"),
max_length=100,
validators=(
MinLengthValidator(2),
)
)
owner = models.ForeignKey(
to=settings.AUTH_USER_MODEL,
related_name='owned_locker_set',
verbose_name=_('owner'),
on_delete=models.CASCADE
)
plan = models.SmallIntegerField(
verbose_name=_('plan'),
help_text=_('billing plan'),
default=BASIC,
choices=PLAN_CHOICES
)
多个用户可以加入储物柜。
class GatewayEventsConsumer(AsyncWebsocketConsumer):
"""
An ASGI consumer for gateway event sending.
"""
# OPCODES FOR GATEWAY
# COMMANDS AND EVENTS
DISPATCH = 0
HEARTBEAT = 1
IDENTIFY = 2
READY = 3
HELLO = 4
HEARTBEAT_ACK = 5
def __init__(self):
super(GatewayEventsConsumer, self).__init__()
self.connection = None
self.last_heartbeat = 0
# handlers for gateway commands
self.opcode_receivers = {
self.IDENTIFY: self.identify,
self.HEARTBEAT: self.heartbeat
}
def authenticate_scope(self, token):
"""
sets the scope's user to the token's
user object after validating the same.
We need to do this explicitly without
and token-authentication-middleware
because websocket connections with http
headers are not supported in most cases.
"""
try:
token = Token.objects.get(key=token)
self.scope['user'] = token.user
except ObjectDoesNotExist:
raise exceptions.AuthenticationFailed(
_('invalid token')
)
async def connect(self):
# add the user to their private group. Because
# we add the user before they identify themselves
# via the gateway we cannot get the user's id.
# Hence we use the channel name as a substitute
# for the private group's unique identifier
await self.channel_layer.group_add(
group=self.channel_name,
channel=self.channel_name
)
await self.accept()
# indicate the user that they
# have connected to the gateway successfully
await self.hello()
async def disconnect(self, close_code: int):
# delete the connection we created
# earlier when the client disconnects
await self.connection.disconnect_buckets()
await self.channel_layer.group_discard(
group=self.channel_name,
channel=self.channel_name
)
self.connection.delete()
async def receive(self, text_data=None, bytes_data=None):
"""
converts json-data before parsing the
gateway command issued by the client. Closes
the connection if any ClientSideException occurs.
"""
try:
data = json.loads(text_data)
await self.parse(data)
except exceptions.ClientSideError as exc:
await self.close(exc.close_code)
async def parse(self, data: dict):
"""
An helper function which parses
commands sent by the client. Raises
a subclass of ClientSideException if
any exceptions arise.
"""
payload = data.get('data', None)
opcode = data.get('opcode', None)
handler = self.opcode_receivers.get(opcode, None)
if handler is None:
raise exceptions.InvalidOPCode(
_('invalid opcode given')
)
if payload is None:
raise exceptions.InvalidPayload(
_('empty payload given')
)
await handler(data)
async def dispatch_event(self, event: dict):
"""
called when sending gateway commands or
events. These events may not have the event
name and the `data` object for the same is
optional. The opcode varies for events.
"""
opcode = event.get('opcode', self.DISPATCH)
await self.send(
text_data=json.dumps(
{
'opcode': opcode,
'event': event.get('event', None),
'data': event.get('data', None)
}
)
)
async def heartbeat(self, data: dict):
"""
touch the connection and send
back response to indicate that
we've received the heartbeat
"""
sequence = data.get('sequence', None)
if sequence is not self.last_heartbeat:
raise exceptions.InvalidPayload(
_('invalid sequence')
)
self.last_heartbeat += 1
self.connection.touch()
await self.heartbeat_ack()
async def identify(self, data: dict):
"""
authenticate the user and provide
a valid state via ready if the given
credentials were valid
"""
user = self.scope.get('user')
token = data.get('token', None)
if user.is_authenticated:
# we cannot authenticate the user once
# again. Close the session and wait for a
# fresh identify packet from the client
raise exceptions.AlreadyAuthenticated(
_('already authenticated')
)
if token is None:
raise exceptions.InvalidPayload(
_('token not given')
)
self.authenticate_scope(token)
self.connection = Connection.objects.create(
# create a connection object for the user
# with the given channel id. This ensures
# that we can get rid of stale connections
# periodically by pruning with celery
# Users must send heart beats to ensure
# they don't get disconnected by the gateway
user=user,
channel_name=self.channel_name
)
await self.connection.connect_buckets()
await self.ready()
async def heartbeat_ack(self):
"""
indicate that we've received
the user's heartbeat. Clients
can use this event to detect
unusual connections
"""
await self.channel_layer.group_send(
self.channel_name,
{
'type': 'dispatch_event',
'opcode': self.HEARTBEAT_ACK
}
)
async def ready(self):
"""
indicate that the client is in a
connected state and is ready to
receive gateway events
"""
await self.channel_layer.group_send(
self.channel_name,
{
'type': 'dispatch_event',
'opcode': self.READY,
}
)
async def hello(self):
"""
indicate that the user's
connection has been accepted
"""
await self.channel_layer.group_send(
self.channel_name,
{
'type': 'dispatch_event',
'opcode': self.HELLO,
'data': {
'interval': settings.CONNECTION_MAX_AGE
}
}
)
这是我的消费者class。
这是我的连接模型,代表一个活动的并发连接。
class Connection(CoreModel):
channel_name = models.CharField(
verbose_name=_('channel name'),
help_text=_("event stream channel name"),
max_length=255,
unique=True
)
user = models.ForeignKey(
to=settings.AUTH_USER_MODEL,
verbose_name=_('user'),
on_delete=models.CASCADE
)
last_seen = models.DateTimeField(
verbose_name=_('last seen'),
help_text=_("when the last heartbeat was received at"),
auto_now_add=True
)
objects = ConnectionsManager()
class Meta(CoreModel.Meta):
verbose_name = _('connection')
verbose_name_plural = _('connections')
def __str__(self):
return self.channel_name
def touch(self):
"""
updates the `last_seen` field
to the current time.
"""
self.last_seen = now()
self.save(update_fields=['last_seen'])
async def connect_buckets(self):
"""
helper function to add the connection to
all the locker-channels the user is in.
"""
for member in self.user.member_set.all():
await channel_layer.group_add(
group=member.locker.id,
channel=self.channel_name
)
async def disconnect_buckets(self):
"""
helper function to remove the connection
from all the locker-channels the user is in.
"""
for member in self.user.member_set.all():
await channel_layer.group_discard(
group=member.locker.id,
channel=self.channel_name
)
每当用户加入 websocket 框架(连接到它)时,我都会让用户使用各自的储物柜 ID 加入频道(如果用户是储物柜 A 和 B 的一部分,他们将加入频道 'A.id' , 'B.id').
我发送这样的事件。
def on_upload_save(instance=None, **kwargs):
if kwargs.pop('created', False):
return async_to_sync(channel_layer.group_send)(
str(instance.locker.id),
{
'type': 'dispatch_event',
'event': 'UPLOAD_CREATE',
'data': PartialUploadSerializer(instance).data
}
)
async_to_sync(channel_layer.group_send)(
str(instance.locker.id),
{
'type': 'dispatch_event',
'event': 'UPLOAD_UPDATE',
'data': PartialUploadSerializer(instance).data
}
)
问题是,在发送这些事件的同时,我还想检查权限。
例如,只有 READ_UPLOADS
权限的用户才能获得 UPLOAD_CREATE
和 UPLOAD_UPDATE
事件。
我有一个关于用户的功能,可以检查用户是否有权限,比如
user.has_permission('UPLOAD_CREATE')
如何实现这样的系统?
在您的 async def dispatch_event(self, event: dict):
方法中,您应该能够从 scope["user"]
访问用户,以便此时可以进行过滤。
或者,您可以为每种类型的消息创建一个群组,然后只订阅允许用户查看的群组。 (这是更好的解决方案)。