如何使用 Django Channels 发送 websocket 延迟组消息?
How to send websocket deferred group messages with Django Channels?
我正在 Django Channels 上创建浏览器游戏。其中的玩家必须在 1 分钟内回答问题。首先,我发送带有问题的事件,然后想延迟一分钟发送消息。
我正在使用 RedisChannelLayer 和 JsonWebsocketConsumer。
class RoomConsumer(JsonWebsocketConsumer):
def connect(self):
room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = 'game_' + room_name
async_to_sync(self.channel_layer.group_add)(self.room_group_name, self.channel_name)
self.accept()
self.send_json(connection_event())
def send_message(self, event):
self.send_json(event.get('data'))
def receive_json(self, data, **kwargs):
try:
print(data)
event_type = data.get('eventType')
if event_type == 'start':
tasks = []
...
async_to_sync(self.channel_layer.group_send)(self.room_group_name, {
'type': 'send_message',
'data': start_event(tasks)
})
# here want to send deferred message
except Exception as e:
print(e)
计划通过异步睡眠来完成,但需要能够暂停并播放此消息或停止并以新的延迟重新发送。
async def send_delayed_message(self, event):
await sleep(60)
await self.send_json(event.get('data'))
您是否遇到过在没有停止消费者的情况下延迟发送此类消息的情况?
我会用 celery 来完成 shared_task
。
第 1 步:
在tasks.py
中写一个发送延迟消息的函数:
from celery import shared_task
from channels.layers import get_channel_layer
@shared_task
def send_delayed_message(room_group_name, message):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(room_group_name, {"type": "send_message", "room_group_name": room_group_name, "message": {"message": message})
第 2 步:
将延迟属性添加到您的数据对象:
data["delay"] = 60 #in seconds
第 3 步:
使用延迟属性从您的 RoomConsumer 调用函数:
async def send_delayed_message(self, event):
data = event.get("data")
message = data.get("message")
room_id = self.room_group_name
delay = data.get("delay")
if delay:
task = await send_delayed_message.apply_async(args=[room_id, message], countdown=delay)
return task
第 4 步:
不是“暂停”任务,而是结束它并重新发送相同的消息。保持暂停和停止的逻辑相同。
from proj.celery import app
def end_celery_message(task_id):
app.control.revoke(task_id, terminate=True)
我正在 Django Channels 上创建浏览器游戏。其中的玩家必须在 1 分钟内回答问题。首先,我发送带有问题的事件,然后想延迟一分钟发送消息。
我正在使用 RedisChannelLayer 和 JsonWebsocketConsumer。
class RoomConsumer(JsonWebsocketConsumer):
def connect(self):
room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = 'game_' + room_name
async_to_sync(self.channel_layer.group_add)(self.room_group_name, self.channel_name)
self.accept()
self.send_json(connection_event())
def send_message(self, event):
self.send_json(event.get('data'))
def receive_json(self, data, **kwargs):
try:
print(data)
event_type = data.get('eventType')
if event_type == 'start':
tasks = []
...
async_to_sync(self.channel_layer.group_send)(self.room_group_name, {
'type': 'send_message',
'data': start_event(tasks)
})
# here want to send deferred message
except Exception as e:
print(e)
计划通过异步睡眠来完成,但需要能够暂停并播放此消息或停止并以新的延迟重新发送。
async def send_delayed_message(self, event):
await sleep(60)
await self.send_json(event.get('data'))
您是否遇到过在没有停止消费者的情况下延迟发送此类消息的情况?
我会用 celery 来完成 shared_task
。
第 1 步:
在tasks.py
中写一个发送延迟消息的函数:
from celery import shared_task
from channels.layers import get_channel_layer
@shared_task
def send_delayed_message(room_group_name, message):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(room_group_name, {"type": "send_message", "room_group_name": room_group_name, "message": {"message": message})
第 2 步:
将延迟属性添加到您的数据对象:
data["delay"] = 60 #in seconds
第 3 步:
使用延迟属性从您的 RoomConsumer 调用函数:
async def send_delayed_message(self, event):
data = event.get("data")
message = data.get("message")
room_id = self.room_group_name
delay = data.get("delay")
if delay:
task = await send_delayed_message.apply_async(args=[room_id, message], countdown=delay)
return task
第 4 步:
不是“暂停”任务,而是结束它并重新发送相同的消息。保持暂停和停止的逻辑相同。
from proj.celery import app
def end_celery_message(task_id):
app.control.revoke(task_id, terminate=True)