如何接收连续的数据流

How to receive continuous stream of data

我已经创建了一个 django-server(使用 django-channels),连续的数据流将被发送到连接客户端的通道层。

下面的代码代表客户端,其中“generate.sepsis”会触发服务器端的函数在通道上发送json;我只是从服务器接收所有传输的数据并将其打印到控制台。

async def receive_data_from_start_sepsis():
    ws_pat=websocket.WebSocket()
    ws_pat.connect('ws://localhost:8000/sepsisDynamic/?token=1fe10f828b00e170b3a9c5d41fc168a31facefc3')
    #time.sleep(7)
    await ws_pat.send(json.dumps({
    'type':'generate.sepsis',
    'data': {
                "heart_rate": 55,
                "oxy_saturation": 26.5,
                "temperature": 50,
                "blood_pressure": 95.48,
                "resp_rate": 156,
                "mean_art_pre": 85,
                "user_id": 15 
            }
    }))
    #time.sleep(2)
    while True:
        greeting = await ws_pat.recv()
        print(f"< {greeting}")
        asyncio.sleep(2)

# asyncio.run(receive_data_from_start_sepsis())
try:
    asyncio.get_event_loop().run_forever()
finally:
    asyncio.get_event_loop().run_until_complete(receive_data_from_start_sepsis())

但我收到以下错误

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-6-9fdd3245dd6e> in <module>
     24 try:
---> 25     asyncio.get_event_loop().run_forever()
     26 finally:

~\anaconda3\lib\asyncio\base_events.py in run_forever(self)
    524         if self.is_running():
--> 525             raise RuntimeError('This event loop is already running')
    526         if events._get_running_loop() is not None:

RuntimeError: This event loop is already running

During handling of the above exception, another exception occurred:

RuntimeError                              Traceback (most recent call last)
<ipython-input-6-9fdd3245dd6e> in <module>
     25     asyncio.get_event_loop().run_forever()
     26 finally:
---> 27     asyncio.get_event_loop().run_until_complete(receive_data_from_start_sepsis())

~\anaconda3\lib\asyncio\base_events.py in run_until_complete(self, future)
    568         future.add_done_callback(_run_until_complete_cb)
    569         try:
--> 570             self.run_forever()
    571         except:
    572             if new_task and future.done() and not future.cancelled():

~\anaconda3\lib\asyncio\base_events.py in run_forever(self)
    523         self._check_closed()
    524         if self.is_running():
--> 525             raise RuntimeError('This event loop is already running')
    526         if events._get_running_loop() is not None:
    527             raise RuntimeError(

RuntimeError: This event loop is already running

但是当服务器上的异步代码完成其(迭代)发送数据时;套接字像这样接收所有数据。 (这是发送的第一个数据项。)

{"type": "echo.message", "data": {"id": 147, "heart_rate": 155.0, "oxy_saturation": 150.0, "temperature": 43.0, "blood_pressure": 94.0, "resp_rate": 174.0, "mean_art_pre": 186.0, "patient": 10}}

django中的异步函数是:-

async def generating_patient_sepsis(self, message):
        # get the data from message
        data = message.get('data')
        print(f"THE INITIAL DATA {data}")
        # get the patient's id
        get_pat_id_in_data = await self._convert_user_id_to_patient_id(data)
        data = get_pat_id_in_data
        while True:
            time.sleep(5)
            await asyncio.sleep(1)
            # random sepsis data generated and `data` variable is mutated
            data.update({'heart_rate': random.randint(24, 200)})
            data.update({'oxy_saturation': random.randint(24, 200)})
            data.update({'temperature': random.randint(24, 200)})
            data.update({'blood_pressure': random.randint(24, 200)})
            data.update({'resp_rate': random.randint(24, 200)})
            data.update({'mean_art_pre': random.randint(24, 200)})
            print(f"THE DATA  --> {data}")
            # serializing and saving the data
            x = await self.serializer_checking_saving_data(data)
            # send the data to the channel
            await self.channel_layer.group_send(
                group=self.pat_grp_id,
                message={
                    'type': 'echo.message',
                    'data': x
                }
            )

我还想了解如何在 javascript 中接收相同的数据,以便我可以用动态图形方式表示变化;

测试失败的原因
greeting = await ws_pat.recv()

是因为websocket接收函数是一个同步函数,我一直在等待它。 如果我选择删除 await 关键字,它将接收数据流,但它只保留 json-data.

流中的第一个值

我能够接收这些 json 数据的方式是定义一个异步函数并接收它们的 websocket 数据;所以每当 websocket 数据将从服务器广播时。

async def receive_sepsis(ws_pat):
    return ws_pat.recv()

此实现的问题是广播的事件循环永远不会完成,因为 while 循环为 True;因此,所有从服务器广播到群组的 json 数据(即 self.pat_grp_id)都会被阻塞在

await self.channel_layer.group_send(<grp-name>,<message>)

任何人都可以帮助我,我应该如何实际处理这个问题;
我只想将患者和医生连接到同一组(即患者模型中的 grp_id 属性,这是一个 UUID 字段);连接后,患者将在 websocket 上请求“start_diagnosis”;这将生成疾病的伪随机数据;保存在数据库中,returns 相同数据的序列化版本,然后将其广播到具有患者 grp_id 的组名。