如何在 asyncio.gather(*tasks) 中检测关闭的 websocket
How to detect closed websocket in asyncio.gather(*tasks)
我有一个异步任务列表,其中包含连接、握手和从 websocket 接收数据。这个过程 运行 是正确的,但有时其中一个 websocket(或所有 websocket)的连接会关闭。
我怎样才能检测到关闭的连接并与之建立新的连接?
这是我使用的代码:
async def main(_id):
try:
async with websockets.connect("wss://ws.bitpin.ir/", extra_headers = request_header, timeout=10, ping_interval=None) as websocket:
await websocket.send('{"method":"sub_to_price_info"}')
recv_msg = await websocket.recv()
if recv_msg == '{"message": "sub to price info"}':
await websocket.send(json.dumps({"method":"sub_to_market","id":_id}))
recv_msg = await websocket.recv()
print(recv_msg)
counter = 1
task = asyncio.create_task(ping_func(websocket))
while True:
msg = await websocket.recv()
return_func(msg, counter, asyncio.current_task().get_name()) ## Return recieved message
counter+=1
except Exception as e:
err_log(name='Error in main function', text=str(e))
async def ping_func(websocket):
try:
while True:
await websocket.send('{"message":"PING"}')
# print('------ ping')
await asyncio.sleep(5)
except Exception as e:
err_log(name='Error in ping function', text=str(e))
def return_func(msg, counter, task_name):
if msg != '{"message": "PONG"}' and len(json.loads(msg)) <20:
print(task_name, counter, msg[:100])
else:
print(task_name, counter)
async def handler():
try:
tasks = []
for _id in symbols_id_dict.values():
tasks.append(asyncio.create_task(main(_id), name='task{}'.format(_id)))
responses = await asyncio.gather(*tasks)
except Exception as e:
err_log(name='Error in handler function', text=str(e))
try:
if __name__ == '__main__':
asyncio.run(handler())
else:
os._exit(0)
except Exception as e:
err_log(name='Error in running asyncio handler', text=str(e))
finally:
os._exit(0)
根据下面的行,每个任务都指定了一个名称:
tasks.append(asyncio.create_task(main(_id), name='task{}'.format(_id)))
所以每个任务都可以被检测到。我如何使用此功能来检测关闭的 websocket。
try:
data = await ws.recv()
except (ConnectionClosed):
print("Connection is Closed")
data = None
print('Reconnecting')
websocket = await websockets.connect(params)
我有一个异步任务列表,其中包含连接、握手和从 websocket 接收数据。这个过程 运行 是正确的,但有时其中一个 websocket(或所有 websocket)的连接会关闭。 我怎样才能检测到关闭的连接并与之建立新的连接?
这是我使用的代码:
async def main(_id):
try:
async with websockets.connect("wss://ws.bitpin.ir/", extra_headers = request_header, timeout=10, ping_interval=None) as websocket:
await websocket.send('{"method":"sub_to_price_info"}')
recv_msg = await websocket.recv()
if recv_msg == '{"message": "sub to price info"}':
await websocket.send(json.dumps({"method":"sub_to_market","id":_id}))
recv_msg = await websocket.recv()
print(recv_msg)
counter = 1
task = asyncio.create_task(ping_func(websocket))
while True:
msg = await websocket.recv()
return_func(msg, counter, asyncio.current_task().get_name()) ## Return recieved message
counter+=1
except Exception as e:
err_log(name='Error in main function', text=str(e))
async def ping_func(websocket):
try:
while True:
await websocket.send('{"message":"PING"}')
# print('------ ping')
await asyncio.sleep(5)
except Exception as e:
err_log(name='Error in ping function', text=str(e))
def return_func(msg, counter, task_name):
if msg != '{"message": "PONG"}' and len(json.loads(msg)) <20:
print(task_name, counter, msg[:100])
else:
print(task_name, counter)
async def handler():
try:
tasks = []
for _id in symbols_id_dict.values():
tasks.append(asyncio.create_task(main(_id), name='task{}'.format(_id)))
responses = await asyncio.gather(*tasks)
except Exception as e:
err_log(name='Error in handler function', text=str(e))
try:
if __name__ == '__main__':
asyncio.run(handler())
else:
os._exit(0)
except Exception as e:
err_log(name='Error in running asyncio handler', text=str(e))
finally:
os._exit(0)
根据下面的行,每个任务都指定了一个名称:
tasks.append(asyncio.create_task(main(_id), name='task{}'.format(_id)))
所以每个任务都可以被检测到。我如何使用此功能来检测关闭的 websocket。
try:
data = await ws.recv()
except (ConnectionClosed):
print("Connection is Closed")
data = None
print('Reconnecting')
websocket = await websockets.connect(params)