如何使用 Django Channels 进行 HTTP 长轮询
How to do HTTP long polling with Django Channels
我正在尝试为 Web 请求实现 HTTP 长轮询,但似乎无法在 Channels 文档中找到合适的示例,一切都是关于 Web Sockets 的。
我在消费 HTTP 消息时需要做的是:
- 等待保存特定模型时将发送的群组消息(可能使用信号)
- 等待超时,如果没有收到消息
然后 return 给客户一些东西。
现在我有可以在示例中看到的代码:
def http_consumer(message):
# Make standard HTTP response - access ASGI path attribute directly
response = HttpResponse("Hello world! You asked for %s" % message.content['path'])
# Encode that response into message format (ASGI)
for chunk in AsgiHandler.encode_response(response):
message.reply_channel.send(chunk)
所以我必须 return 在此 http_consumer
中添加一些内容,表明我目前没有任何内容可发送,但我不能在这里阻止。也许我不能 return 什么?然后我必须在特定组上捕获新消息,或者达到超时,并将响应发送给客户端。
看来我需要将 message.reply_channel
存储在某个地方,以便我以后可以回复,但我不知道如何:
- 捕获群消息并生成响应
- 当没有收到消息时(超时)生成响应,也许延迟服务器可以在这里工作?
所以,我最终这样做的方式如下所述。
在消费者中,如果我发现我没有立即发送响应,我会将 message.reply_channel
存储在一个组中,在相关事件的情况下会收到通知,并安排一个延迟消息将在达到最大等待时间时触发。
group_name = group_name_from_mac(mac_address)
Group(group_name).add(message.reply_channel)
message.channel_session['will_wait'] = True
delayed_message = {
'channel': 'long_polling_terminator',
'content': {'mac_address': mac_address,
'reply_channel': message.reply_channel.name,
'group_name': group_name},
'delay': settings.LONG_POLLING_TIMEOUT
}
Channel('asgi.delay').send(delayed_message, immediately=True)
然后,可能会发生两件事。我们要么在相关组上收到消息并提前发送响应,要么延迟消息到达,表明我们已经耗尽了必须等待的时间并且必须 return 响应表明没有事件。
为了在相关事件发生时触发消息,我依赖于 Django 信号:
class PortalConfig(AppConfig):
name = 'portal'
def ready(self):
from .models import STBMessage
post_save.connect(notify_new_message, sender=STBMessage)
def notify_new_message(sender, **kwargs):
mac_address = kwargs['instance'].set_top_box.id
layer = channel_layers['default']
group_name = group_name_from_mac(mac_address)
response = JsonResponse({'error': False, 'new_events': True})
group = Group(group_name)
for chunk in AsgiHandler.encode_response(response):
group.send(chunk)
当超时到期时,我在 long_polling_terminator
通道上收到一条消息,我需要发送一条消息表明没有事件:
def long_polling_terminator(message):
reply_channel = Channel(message['reply_channel'])
group_name = message['group_name']
mac_address = message['mac_address']
layer = channel_layers['default']
boxes = layer.group_channels(group_name)
if message['reply_channel'] in boxes:
response = JsonResponse({'error': False, 'new_events': False})
write_http_response(response, reply_channel)
return
最后要做的是从组中删除此 reply_channel,我在 http.disconnect
消费者中执行此操作:
def process_disconnect(message, group_name_from_mac):
if message.channel_session.get('will_wait', False):
reply_channel = Channel(message['reply_channel'])
mac_address = message.channel_session['mac_address']
group_name = group_name_from_mac(mac_address)
Group(group_name).discard(reply_channel)
我正在尝试为 Web 请求实现 HTTP 长轮询,但似乎无法在 Channels 文档中找到合适的示例,一切都是关于 Web Sockets 的。
我在消费 HTTP 消息时需要做的是:
- 等待保存特定模型时将发送的群组消息(可能使用信号)
- 等待超时,如果没有收到消息
然后 return 给客户一些东西。
现在我有可以在示例中看到的代码:
def http_consumer(message):
# Make standard HTTP response - access ASGI path attribute directly
response = HttpResponse("Hello world! You asked for %s" % message.content['path'])
# Encode that response into message format (ASGI)
for chunk in AsgiHandler.encode_response(response):
message.reply_channel.send(chunk)
所以我必须 return 在此 http_consumer
中添加一些内容,表明我目前没有任何内容可发送,但我不能在这里阻止。也许我不能 return 什么?然后我必须在特定组上捕获新消息,或者达到超时,并将响应发送给客户端。
看来我需要将 message.reply_channel
存储在某个地方,以便我以后可以回复,但我不知道如何:
- 捕获群消息并生成响应
- 当没有收到消息时(超时)生成响应,也许延迟服务器可以在这里工作?
所以,我最终这样做的方式如下所述。
在消费者中,如果我发现我没有立即发送响应,我会将 message.reply_channel
存储在一个组中,在相关事件的情况下会收到通知,并安排一个延迟消息将在达到最大等待时间时触发。
group_name = group_name_from_mac(mac_address)
Group(group_name).add(message.reply_channel)
message.channel_session['will_wait'] = True
delayed_message = {
'channel': 'long_polling_terminator',
'content': {'mac_address': mac_address,
'reply_channel': message.reply_channel.name,
'group_name': group_name},
'delay': settings.LONG_POLLING_TIMEOUT
}
Channel('asgi.delay').send(delayed_message, immediately=True)
然后,可能会发生两件事。我们要么在相关组上收到消息并提前发送响应,要么延迟消息到达,表明我们已经耗尽了必须等待的时间并且必须 return 响应表明没有事件。
为了在相关事件发生时触发消息,我依赖于 Django 信号:
class PortalConfig(AppConfig):
name = 'portal'
def ready(self):
from .models import STBMessage
post_save.connect(notify_new_message, sender=STBMessage)
def notify_new_message(sender, **kwargs):
mac_address = kwargs['instance'].set_top_box.id
layer = channel_layers['default']
group_name = group_name_from_mac(mac_address)
response = JsonResponse({'error': False, 'new_events': True})
group = Group(group_name)
for chunk in AsgiHandler.encode_response(response):
group.send(chunk)
当超时到期时,我在 long_polling_terminator
通道上收到一条消息,我需要发送一条消息表明没有事件:
def long_polling_terminator(message):
reply_channel = Channel(message['reply_channel'])
group_name = message['group_name']
mac_address = message['mac_address']
layer = channel_layers['default']
boxes = layer.group_channels(group_name)
if message['reply_channel'] in boxes:
response = JsonResponse({'error': False, 'new_events': False})
write_http_response(response, reply_channel)
return
最后要做的是从组中删除此 reply_channel,我在 http.disconnect
消费者中执行此操作:
def process_disconnect(message, group_name_from_mac):
if message.channel_session.get('will_wait', False):
reply_channel = Channel(message['reply_channel'])
mac_address = message.channel_session['mac_address']
group_name = group_name_from_mac(mac_address)
Group(group_name).discard(reply_channel)