如何创建连续发送数据并在消费者 django 通道 2 中安全断开连接的任务?
How to create task which sends data continuously and disconnect safely in consumers, django channels 2?
来自 this 的答案,这有助于每 n
秒从 consumers
发送数据。
尝试使用 creat_task
方法正确处理断开连接,尝试通过发送 [=17] 停止 while-loop
(用于每 n
秒发送一次数据) =](假设,这个标志没有发送到创建任务的同一个实例)。
consumers.py:
class AutoUpdateConsumer(AsyncConsumer):
async def websocket_connect(self, event):
print("connected", event)
await self.send({
"type": "websocket.accept"
})
await self.create_task(True)
async def websocket_receive(self, event):
print("receive", event)
async def websocket_disconnect(self, event):
await self.create_task(False)
print("disconnected", event)
async def create_task(self, flag=True):
while flag:
await asyncio.sleep(2)
df= pd.DataFrame(data=[random.sample(range(100), 4) for _ in range(5)])
await self.send({
'type': 'websocket.send',
'text': df.to_html(),
})
警告:
2019-09-11 14:40:06,400 - WARNING - server - Application instance
<Task pending coro=<SessionMiddlewareInstance.__call__() running at
D:\Django\Django channels\django_channels_env\lib\site-packages\channels\sessions.py:175>
wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at
0x000001870E06C618>()] for connection <WebSocketProtocol client=
['127.0.0.1', 63789] path=b'/ws/home'> took too long to shut down and was
killed.
如何安全地 stop_task 而不是等待 channels
终止任务?
或
如何在同一个 class 中的另一个方法中停止方法中的无限循环 运行?
版本:
- Django == 2.0.7
- 频道 == 2.1.2
我建议在连接到消费者时创建一个组。这样,只要知道组名 (auto_update
).
,就可以从 django 项目中的任何位置触发消息
from channels.generic.websocket import AsyncWebsocketConsumer
class AutoUpdateConsumer(AsyncWebsocketConsumer):
async def connect(self):
print('connect')
# join the group
self.group_name = 'auto_update'
await self.channel_layer.group_add(
self.group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, event):
print('disconnect')
# leave the group
await self.channel_layer.group_discard(
self.group_name,
self.channel_name
)
async def receive(self, event):
print('receive')
async def auto_update(self, event):
print('sending df')
df = event['df']
await self.send({
'text': df
})
要发送消息,我会使用自定义 management command。要停止该命令,我将创建一个单例模型(只有一个实例的模型),该模型具有一个布尔字段,可以定期检查该字段以查看是否应停止循环。
首先使用get_channel_layer()
获取与redis通信的active层,然后在循环中调用group_send
调用type
键指定的consumer方法
# /project/app/management/commands/auto_update.py
from django.core.management.base import BaseCommand
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from config.models import AutoUpdateSettings
class Command(BaseCommand):
help = 'Command to start auto updating'
def handle(self, *args, **kwargs):
settings = AutoUpdateSettings.objects.first()
settings.keep_running = True
settings.save()
group_name = 'auto_update'
channel_layer = get_channel_layer()
while True:
settings.refresh_from_db()
if not settings.keep_running:
break
df= pd.DataFrame(data=[random.sample(range(100), 4) for _ in range(5)])
async_to_sync(channel_layer.group_send)(
group_name,
{
'type': 'auto_update', # this is the name of your consumer method
'df': df.to_html()
}
)
要启动将消息发送到组的循环,您可以调用命令 python manage.py auto_update
。要停止命令,您可以使用管理页面并将 keep_running
设置为 false。
来自 this 的答案,这有助于每 n
秒从 consumers
发送数据。
尝试使用 creat_task
方法正确处理断开连接,尝试通过发送 [=17] 停止 while-loop
(用于每 n
秒发送一次数据) =](假设,这个标志没有发送到创建任务的同一个实例)。
consumers.py:
class AutoUpdateConsumer(AsyncConsumer):
async def websocket_connect(self, event):
print("connected", event)
await self.send({
"type": "websocket.accept"
})
await self.create_task(True)
async def websocket_receive(self, event):
print("receive", event)
async def websocket_disconnect(self, event):
await self.create_task(False)
print("disconnected", event)
async def create_task(self, flag=True):
while flag:
await asyncio.sleep(2)
df= pd.DataFrame(data=[random.sample(range(100), 4) for _ in range(5)])
await self.send({
'type': 'websocket.send',
'text': df.to_html(),
})
警告:
2019-09-11 14:40:06,400 - WARNING - server - Application instance
<Task pending coro=<SessionMiddlewareInstance.__call__() running at
D:\Django\Django channels\django_channels_env\lib\site-packages\channels\sessions.py:175>
wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at
0x000001870E06C618>()] for connection <WebSocketProtocol client=
['127.0.0.1', 63789] path=b'/ws/home'> took too long to shut down and was
killed.
如何安全地 stop_task 而不是等待 channels
终止任务?
或
如何在同一个 class 中的另一个方法中停止方法中的无限循环 运行?
版本:
- Django == 2.0.7
- 频道 == 2.1.2
我建议在连接到消费者时创建一个组。这样,只要知道组名 (auto_update
).
from channels.generic.websocket import AsyncWebsocketConsumer
class AutoUpdateConsumer(AsyncWebsocketConsumer):
async def connect(self):
print('connect')
# join the group
self.group_name = 'auto_update'
await self.channel_layer.group_add(
self.group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, event):
print('disconnect')
# leave the group
await self.channel_layer.group_discard(
self.group_name,
self.channel_name
)
async def receive(self, event):
print('receive')
async def auto_update(self, event):
print('sending df')
df = event['df']
await self.send({
'text': df
})
要发送消息,我会使用自定义 management command。要停止该命令,我将创建一个单例模型(只有一个实例的模型),该模型具有一个布尔字段,可以定期检查该字段以查看是否应停止循环。
首先使用get_channel_layer()
获取与redis通信的active层,然后在循环中调用group_send
调用type
键指定的consumer方法
# /project/app/management/commands/auto_update.py
from django.core.management.base import BaseCommand
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from config.models import AutoUpdateSettings
class Command(BaseCommand):
help = 'Command to start auto updating'
def handle(self, *args, **kwargs):
settings = AutoUpdateSettings.objects.first()
settings.keep_running = True
settings.save()
group_name = 'auto_update'
channel_layer = get_channel_layer()
while True:
settings.refresh_from_db()
if not settings.keep_running:
break
df= pd.DataFrame(data=[random.sample(range(100), 4) for _ in range(5)])
async_to_sync(channel_layer.group_send)(
group_name,
{
'type': 'auto_update', # this is the name of your consumer method
'df': df.to_html()
}
)
要启动将消息发送到组的循环,您可以调用命令 python manage.py auto_update
。要停止命令,您可以使用管理页面并将 keep_running
设置为 false。