如何创建连续发送数据并在消费者 django 通道 2 中安全断开连接的任务?

How to create task which sends data continuously and disconnect safely in consumers, django channels 2?

来自 this 的答案,这有助于每 n 秒从 consumers 发送数据。

尝试使用 creat_task 方法正确处理断开连接,尝试通过发送 [=17] 停止 while-loop(用于每 n 秒发送一次数据) =](假设,这个标志没有发送到创建任务的同一个实例)。

consumers.py:

class AutoUpdateConsumer(AsyncConsumer):

    async def websocket_connect(self, event):
        print("connected", event)
        await self.send({
            "type": "websocket.accept"
        })
        await self.create_task(True)

    async def websocket_receive(self, event):
        print("receive", event)

    async def websocket_disconnect(self, event):
        await self.create_task(False)

        print("disconnected", event)


    async def create_task(self, flag=True):
        while flag:
            await asyncio.sleep(2)

            df= pd.DataFrame(data=[random.sample(range(100), 4) for _ in range(5)])

            await self.send({
                'type': 'websocket.send',
                'text': df.to_html(),
            })

警告:

2019-09-11 14:40:06,400 - WARNING - server - Application instance 
<Task pending coro=<SessionMiddlewareInstance.__call__() running at 
D:\Django\Django channels\django_channels_env\lib\site-packages\channels\sessions.py:175>
wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 
0x000001870E06C618>()] for connection <WebSocketProtocol client=
['127.0.0.1', 63789] path=b'/ws/home'> took too long to shut down and was 
killed.

如何安全地 stop_task 而不是等待 channels 终止任务?

如何在同一个 class 中的另一个方法中停止方法中的无限循环 运行?

版本:

我建议在连接到消费者时创建一个组。这样,只要知道组名 (auto_update).

,就可以从 django 项目中的任何位置触发消息
from channels.generic.websocket import AsyncWebsocketConsumer

class AutoUpdateConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        print('connect')

        # join the group
        self.group_name = 'auto_update'
        await self.channel_layer.group_add(
            self.group_name,
            self.channel_name
        )
        await self.accept()

    async def disconnect(self, event):
        print('disconnect')

        # leave the group
        await self.channel_layer.group_discard(
            self.group_name,
            self.channel_name
        )

    async def receive(self, event):
        print('receive')

    async def auto_update(self, event):
        print('sending df')
        df = event['df']

        await self.send({
            'text': df
        })

要发送消息,我会使用自定义 management command。要停止该命令,我将创建一个单例模型(只有一个实例的模型),该模型具有一个布尔字段,可以定期检查该字段以查看是否应停止循环。

首先使用get_channel_layer()获取与redis通信的active层,然后在循环中调用group_send调用type键指定的consumer方法

# /project/app/management/commands/auto_update.py

from django.core.management.base import BaseCommand
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from config.models import AutoUpdateSettings

class Command(BaseCommand):
    help = 'Command to start auto updating'

    def handle(self, *args, **kwargs):
        settings = AutoUpdateSettings.objects.first()
        settings.keep_running = True
        settings.save()

        group_name = 'auto_update'
        channel_layer = get_channel_layer()

        while True:
            settings.refresh_from_db()
            if not settings.keep_running:
                break

            df= pd.DataFrame(data=[random.sample(range(100), 4) for _ in range(5)])
            async_to_sync(channel_layer.group_send)(
                group_name,
                {
                    'type': 'auto_update',  # this is the name of your consumer method
                    'df': df.to_html()
                }
            )

要启动将消息发送到组的循环,您可以调用命令 python manage.py auto_update。要停止命令,您可以使用管理页面并将 keep_running 设置为 false。