如何接收连续的数据流
How to receive continuous stream of data
我已经创建了一个 django-server(使用 django-channels),连续的数据流将被发送到连接客户端的通道层。
下面的代码代表客户端,其中“generate.sepsis”会触发服务器端的函数在通道上发送json;我只是从服务器接收所有传输的数据并将其打印到控制台。
async def receive_data_from_start_sepsis():
ws_pat=websocket.WebSocket()
ws_pat.connect('ws://localhost:8000/sepsisDynamic/?token=1fe10f828b00e170b3a9c5d41fc168a31facefc3')
#time.sleep(7)
await ws_pat.send(json.dumps({
'type':'generate.sepsis',
'data': {
"heart_rate": 55,
"oxy_saturation": 26.5,
"temperature": 50,
"blood_pressure": 95.48,
"resp_rate": 156,
"mean_art_pre": 85,
"user_id": 15
}
}))
#time.sleep(2)
while True:
greeting = await ws_pat.recv()
print(f"< {greeting}")
asyncio.sleep(2)
# asyncio.run(receive_data_from_start_sepsis())
try:
asyncio.get_event_loop().run_forever()
finally:
asyncio.get_event_loop().run_until_complete(receive_data_from_start_sepsis())
但我收到以下错误
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-6-9fdd3245dd6e> in <module>
24 try:
---> 25 asyncio.get_event_loop().run_forever()
26 finally:
~\anaconda3\lib\asyncio\base_events.py in run_forever(self)
524 if self.is_running():
--> 525 raise RuntimeError('This event loop is already running')
526 if events._get_running_loop() is not None:
RuntimeError: This event loop is already running
During handling of the above exception, another exception occurred:
RuntimeError Traceback (most recent call last)
<ipython-input-6-9fdd3245dd6e> in <module>
25 asyncio.get_event_loop().run_forever()
26 finally:
---> 27 asyncio.get_event_loop().run_until_complete(receive_data_from_start_sepsis())
~\anaconda3\lib\asyncio\base_events.py in run_until_complete(self, future)
568 future.add_done_callback(_run_until_complete_cb)
569 try:
--> 570 self.run_forever()
571 except:
572 if new_task and future.done() and not future.cancelled():
~\anaconda3\lib\asyncio\base_events.py in run_forever(self)
523 self._check_closed()
524 if self.is_running():
--> 525 raise RuntimeError('This event loop is already running')
526 if events._get_running_loop() is not None:
527 raise RuntimeError(
RuntimeError: This event loop is already running
但是当服务器上的异步代码完成其(迭代)发送数据时;套接字像这样接收所有数据。
(这是发送的第一个数据项。)
{"type": "echo.message", "data": {"id": 147, "heart_rate": 155.0, "oxy_saturation": 150.0, "temperature": 43.0, "blood_pressure": 94.0, "resp_rate": 174.0, "mean_art_pre": 186.0, "patient": 10}}
django中的异步函数是:-
async def generating_patient_sepsis(self, message):
# get the data from message
data = message.get('data')
print(f"THE INITIAL DATA {data}")
# get the patient's id
get_pat_id_in_data = await self._convert_user_id_to_patient_id(data)
data = get_pat_id_in_data
while True:
time.sleep(5)
await asyncio.sleep(1)
# random sepsis data generated and `data` variable is mutated
data.update({'heart_rate': random.randint(24, 200)})
data.update({'oxy_saturation': random.randint(24, 200)})
data.update({'temperature': random.randint(24, 200)})
data.update({'blood_pressure': random.randint(24, 200)})
data.update({'resp_rate': random.randint(24, 200)})
data.update({'mean_art_pre': random.randint(24, 200)})
print(f"THE DATA --> {data}")
# serializing and saving the data
x = await self.serializer_checking_saving_data(data)
# send the data to the channel
await self.channel_layer.group_send(
group=self.pat_grp_id,
message={
'type': 'echo.message',
'data': x
}
)
我还想了解如何在 javascript 中接收相同的数据,以便我可以用动态图形方式表示变化;
测试失败的原因
greeting = await ws_pat.recv()
是因为websocket接收函数是一个同步函数,我一直在等待它。
如果我选择删除 await 关键字,它将接收数据流,但它只保留 json-data.
流中的第一个值
我能够接收这些 json 数据的方式是定义一个异步函数并接收它们的 websocket 数据;所以每当 websocket 数据将从服务器广播时。
async def receive_sepsis(ws_pat):
return ws_pat.recv()
此实现的问题是广播的事件循环永远不会完成,因为 while 循环为 True;因此,所有从服务器广播到群组的 json 数据(即 self.pat_grp_id)都会被阻塞在
await self.channel_layer.group_send(<grp-name>,<message>)
任何人都可以帮助我,我应该如何实际处理这个问题;
我只想将患者和医生连接到同一组(即患者模型中的 grp_id 属性,这是一个 UUID 字段);连接后,患者将在 websocket 上请求“start_diagnosis”;这将生成疾病的伪随机数据;保存在数据库中,returns 相同数据的序列化版本,然后将其广播到具有患者 grp_id 的组名。
我已经创建了一个 django-server(使用 django-channels),连续的数据流将被发送到连接客户端的通道层。
下面的代码代表客户端,其中“generate.sepsis”会触发服务器端的函数在通道上发送json;我只是从服务器接收所有传输的数据并将其打印到控制台。
async def receive_data_from_start_sepsis():
ws_pat=websocket.WebSocket()
ws_pat.connect('ws://localhost:8000/sepsisDynamic/?token=1fe10f828b00e170b3a9c5d41fc168a31facefc3')
#time.sleep(7)
await ws_pat.send(json.dumps({
'type':'generate.sepsis',
'data': {
"heart_rate": 55,
"oxy_saturation": 26.5,
"temperature": 50,
"blood_pressure": 95.48,
"resp_rate": 156,
"mean_art_pre": 85,
"user_id": 15
}
}))
#time.sleep(2)
while True:
greeting = await ws_pat.recv()
print(f"< {greeting}")
asyncio.sleep(2)
# asyncio.run(receive_data_from_start_sepsis())
try:
asyncio.get_event_loop().run_forever()
finally:
asyncio.get_event_loop().run_until_complete(receive_data_from_start_sepsis())
但我收到以下错误
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-6-9fdd3245dd6e> in <module>
24 try:
---> 25 asyncio.get_event_loop().run_forever()
26 finally:
~\anaconda3\lib\asyncio\base_events.py in run_forever(self)
524 if self.is_running():
--> 525 raise RuntimeError('This event loop is already running')
526 if events._get_running_loop() is not None:
RuntimeError: This event loop is already running
During handling of the above exception, another exception occurred:
RuntimeError Traceback (most recent call last)
<ipython-input-6-9fdd3245dd6e> in <module>
25 asyncio.get_event_loop().run_forever()
26 finally:
---> 27 asyncio.get_event_loop().run_until_complete(receive_data_from_start_sepsis())
~\anaconda3\lib\asyncio\base_events.py in run_until_complete(self, future)
568 future.add_done_callback(_run_until_complete_cb)
569 try:
--> 570 self.run_forever()
571 except:
572 if new_task and future.done() and not future.cancelled():
~\anaconda3\lib\asyncio\base_events.py in run_forever(self)
523 self._check_closed()
524 if self.is_running():
--> 525 raise RuntimeError('This event loop is already running')
526 if events._get_running_loop() is not None:
527 raise RuntimeError(
RuntimeError: This event loop is already running
但是当服务器上的异步代码完成其(迭代)发送数据时;套接字像这样接收所有数据。 (这是发送的第一个数据项。)
{"type": "echo.message", "data": {"id": 147, "heart_rate": 155.0, "oxy_saturation": 150.0, "temperature": 43.0, "blood_pressure": 94.0, "resp_rate": 174.0, "mean_art_pre": 186.0, "patient": 10}}
django中的异步函数是:-
async def generating_patient_sepsis(self, message):
# get the data from message
data = message.get('data')
print(f"THE INITIAL DATA {data}")
# get the patient's id
get_pat_id_in_data = await self._convert_user_id_to_patient_id(data)
data = get_pat_id_in_data
while True:
time.sleep(5)
await asyncio.sleep(1)
# random sepsis data generated and `data` variable is mutated
data.update({'heart_rate': random.randint(24, 200)})
data.update({'oxy_saturation': random.randint(24, 200)})
data.update({'temperature': random.randint(24, 200)})
data.update({'blood_pressure': random.randint(24, 200)})
data.update({'resp_rate': random.randint(24, 200)})
data.update({'mean_art_pre': random.randint(24, 200)})
print(f"THE DATA --> {data}")
# serializing and saving the data
x = await self.serializer_checking_saving_data(data)
# send the data to the channel
await self.channel_layer.group_send(
group=self.pat_grp_id,
message={
'type': 'echo.message',
'data': x
}
)
我还想了解如何在 javascript 中接收相同的数据,以便我可以用动态图形方式表示变化;
greeting = await ws_pat.recv()
是因为websocket接收函数是一个同步函数,我一直在等待它。 如果我选择删除 await 关键字,它将接收数据流,但它只保留 json-data.
流中的第一个值我能够接收这些 json 数据的方式是定义一个异步函数并接收它们的 websocket 数据;所以每当 websocket 数据将从服务器广播时。
async def receive_sepsis(ws_pat):
return ws_pat.recv()
此实现的问题是广播的事件循环永远不会完成,因为 while 循环为 True;因此,所有从服务器广播到群组的 json 数据(即 self.pat_grp_id)都会被阻塞在
await self.channel_layer.group_send(<grp-name>,<message>)
任何人都可以帮助我,我应该如何实际处理这个问题;
我只想将患者和医生连接到同一组(即患者模型中的 grp_id 属性,这是一个 UUID 字段);连接后,患者将在 websocket 上请求“start_diagnosis”;这将生成疾病的伪随机数据;保存在数据库中,returns 相同数据的序列化版本,然后将其广播到具有患者 grp_id 的组名。