Python 异步 loop.run_forever()
Python asyncio loop.run_forever()
我尝试做一个交易机器人,大多数事情都运行良好。但是每次互联网连接短时间消失时,代码都会失败。我使用 asyncio run_forever() 函数,我认为代码应该 运行 永远直到它停止,但它不起作用。
这是我的代码:
import json
async def listen():
url = "wss://phemex.com/ws"
async with websockets.connect(url) as ws:
sub_msg = json.dumps({
"id": sequence,
"method": "kline.subscribe",
"params": [symbol, kline_interval],
})
await ws.send(sub_msg)
while True:
msg = await ws.recv()
msg = json.loads(msg)["kline"]
我这样调用循环:
loop = asyncio.get_event_loop()
try:
loop.create_task(listen())
asyncio.ensure_future(listen())
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print("Closing Loop")
loop.close()
一失去连接,出现如下错误:
Task exception was never retrieved
future: <Task finished coro=<listen() done, defined at c:\Users\danis\Desktop\AWS\Dateien\Crypto_Bot_Telegram_PSAR_PHEMEX_inkl_websocket_reconnect.py:351> exception=ConnectionClosedError('code = 1006 (connection closed abnormally [internal]), no reason',)>
Traceback (most recent call last):
File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 750, in transfer_data
message = await self.read_message()
File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 819, in read_message
frame = await self.read_data_frame(max_size=self.max_size)
File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 895, in read_data_frame
frame = await self.read_frame(max_size)
File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 975, in read_frame
extensions=self.extensions,
File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\framing.py", line 55, in read
data = await reader(2)
File "C:\Users\danis\.conda\envs\python36\lib\asyncio\streams.py", line 674, in readexactly
yield from self._wait_for_data('readexactly')
File "C:\Users\danis\.conda\envs\python36\lib\asyncio\streams.py", line 464, in _wait_for_data
yield from self._waiter
File "C:\Users\danis\.conda\envs\python36\lib\asyncio\selector_events.py", line 714, in _read_ready
data = self._sock.recv(self.max_size)
ConnectionResetError: [WinError 10054] Eine vorhandene Verbindung wurde vom Remotehost geschlossen
我怎样才能运行这个代码永远?
你的循环 运行 永远,但它无法阻止程序因未处理的异常而崩溃。在您的情况下,您尝试 recv
来自可能失去连接的客户端的数据。你可以自己捕获这个异常!
while True:
async with websockets.connect(url) as ws:
sub_msg = "{\"id\":" + str(
sequence) + ", \"method\": \"kline.subscribe\", \"params\":[\"" + symbol + "\","
+ str(kline_interval) + "]}"
await ws.send(sub_msg)
while True:
try:
msg = await ws.recv()
except ConnectionResetError:
break
msg = json.loads(msg)["kline"]
在 asyncio
中,事件循环负责 运行 异步任务,但它不处理它们可能抛出的错误。 运行 循环有不同的方式,你可以 运行 它来执行特定的任务,或者 运行 它永远,所以它会让 运行ning 等待新任务。
在你的情况下,任务 listen
抛出一个未捕获的异常(ConnectionResetError
),事件循环通过回溯通知你,但它保持 运行ning,也许与其他任务(这意味着 运行 永远)。您刚刚收到通知,就像协程中发生错误一样,任务停止 运行ning。
解法:
在你的协程中处理回溯的错误,你必须确保它会 运行 永远,事件循环不会那样做。
async def listen():
url = "wss://phemex.com/ws"
while True:
try:
async with websockets.connect(url) as ws:
sub_msg = "{\"id\":" + str(
sequence) + ", \"method\": \"kline.subscribe\", \"params\":[\"" + symbol + "\","
+ str(kline_interval) + "]}"
await ws.send(sub_msg)
while True:
msg = await ws.recv()
msg = json.loads(msg)["kline"]
except ConnectionResetError:
print("ConnectionResetError, reconnecting...")
我尝试做一个交易机器人,大多数事情都运行良好。但是每次互联网连接短时间消失时,代码都会失败。我使用 asyncio run_forever() 函数,我认为代码应该 运行 永远直到它停止,但它不起作用。
这是我的代码:
import json
async def listen():
url = "wss://phemex.com/ws"
async with websockets.connect(url) as ws:
sub_msg = json.dumps({
"id": sequence,
"method": "kline.subscribe",
"params": [symbol, kline_interval],
})
await ws.send(sub_msg)
while True:
msg = await ws.recv()
msg = json.loads(msg)["kline"]
我这样调用循环:
loop = asyncio.get_event_loop()
try:
loop.create_task(listen())
asyncio.ensure_future(listen())
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print("Closing Loop")
loop.close()
一失去连接,出现如下错误:
Task exception was never retrieved
future: <Task finished coro=<listen() done, defined at c:\Users\danis\Desktop\AWS\Dateien\Crypto_Bot_Telegram_PSAR_PHEMEX_inkl_websocket_reconnect.py:351> exception=ConnectionClosedError('code = 1006 (connection closed abnormally [internal]), no reason',)>
Traceback (most recent call last):
File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 750, in transfer_data
message = await self.read_message()
File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 819, in read_message
frame = await self.read_data_frame(max_size=self.max_size)
File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 895, in read_data_frame
frame = await self.read_frame(max_size)
File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 975, in read_frame
extensions=self.extensions,
File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\framing.py", line 55, in read
data = await reader(2)
File "C:\Users\danis\.conda\envs\python36\lib\asyncio\streams.py", line 674, in readexactly
yield from self._wait_for_data('readexactly')
File "C:\Users\danis\.conda\envs\python36\lib\asyncio\streams.py", line 464, in _wait_for_data
yield from self._waiter
File "C:\Users\danis\.conda\envs\python36\lib\asyncio\selector_events.py", line 714, in _read_ready
data = self._sock.recv(self.max_size)
ConnectionResetError: [WinError 10054] Eine vorhandene Verbindung wurde vom Remotehost geschlossen
我怎样才能运行这个代码永远?
你的循环 运行 永远,但它无法阻止程序因未处理的异常而崩溃。在您的情况下,您尝试 recv
来自可能失去连接的客户端的数据。你可以自己捕获这个异常!
while True:
async with websockets.connect(url) as ws:
sub_msg = "{\"id\":" + str(
sequence) + ", \"method\": \"kline.subscribe\", \"params\":[\"" + symbol + "\","
+ str(kline_interval) + "]}"
await ws.send(sub_msg)
while True:
try:
msg = await ws.recv()
except ConnectionResetError:
break
msg = json.loads(msg)["kline"]
在 asyncio
中,事件循环负责 运行 异步任务,但它不处理它们可能抛出的错误。 运行 循环有不同的方式,你可以 运行 它来执行特定的任务,或者 运行 它永远,所以它会让 运行ning 等待新任务。
在你的情况下,任务 listen
抛出一个未捕获的异常(ConnectionResetError
),事件循环通过回溯通知你,但它保持 运行ning,也许与其他任务(这意味着 运行 永远)。您刚刚收到通知,就像协程中发生错误一样,任务停止 运行ning。
解法: 在你的协程中处理回溯的错误,你必须确保它会 运行 永远,事件循环不会那样做。
async def listen():
url = "wss://phemex.com/ws"
while True:
try:
async with websockets.connect(url) as ws:
sub_msg = "{\"id\":" + str(
sequence) + ", \"method\": \"kline.subscribe\", \"params\":[\"" + symbol + "\","
+ str(kline_interval) + "]}"
await ws.send(sub_msg)
while True:
msg = await ws.recv()
msg = json.loads(msg)["kline"]
except ConnectionResetError:
print("ConnectionResetError, reconnecting...")