Python 异步 loop.run_forever()

Python asyncio loop.run_forever()

我尝试做一个交易机器人,大多数事情都运行良好。但是每次互联网连接短时间消失时,代码都会失败。我使用 asyncio run_forever() 函数,我认为代码应该 运行 永远直到它停止,但它不起作用。

这是我的代码:

import json


async def listen():
    url = "wss://phemex.com/ws"
    async with websockets.connect(url) as ws:
        sub_msg = json.dumps({
            "id": sequence,
            "method": "kline.subscribe",
            "params": [symbol, kline_interval],
        })
        await ws.send(sub_msg)
        while True:
            msg = await ws.recv()
            msg = json.loads(msg)["kline"]

我这样调用循环:

loop = asyncio.get_event_loop()
try: 
    loop.create_task(listen())
    asyncio.ensure_future(listen())
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    print("Closing Loop")
    loop.close()

一失去连接,出现如下错误:

Task exception was never retrieved
future: <Task finished coro=<listen() done, defined at c:\Users\danis\Desktop\AWS\Dateien\Crypto_Bot_Telegram_PSAR_PHEMEX_inkl_websocket_reconnect.py:351> exception=ConnectionClosedError('code = 1006 (connection closed abnormally [internal]), no reason',)>
Traceback (most recent call last):
  File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 750, in transfer_data
    message = await self.read_message()
  File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 819, in read_message
    frame = await self.read_data_frame(max_size=self.max_size)
  File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 895, in read_data_frame
    frame = await self.read_frame(max_size)
  File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\protocol.py", line 975, in read_frame
    extensions=self.extensions,
  File "C:\Users\danis\.conda\envs\python36\lib\site-packages\websockets\legacy\framing.py", line 55, in read
    data = await reader(2)
  File "C:\Users\danis\.conda\envs\python36\lib\asyncio\streams.py", line 674, in readexactly
    yield from self._wait_for_data('readexactly')
  File "C:\Users\danis\.conda\envs\python36\lib\asyncio\streams.py", line 464, in _wait_for_data
    yield from self._waiter
  File "C:\Users\danis\.conda\envs\python36\lib\asyncio\selector_events.py", line 714, in _read_ready
    data = self._sock.recv(self.max_size)
ConnectionResetError: [WinError 10054] Eine vorhandene Verbindung wurde vom Remotehost geschlossen

我怎样才能运行这个代码永远?

你的循环 运行 永远,但它无法阻止程序因未处理的异常而崩溃。在您的情况下,您尝试 recv 来自可能失去连接的客户端的数据。你可以自己捕获这个异常!

while True:
    async with websockets.connect(url) as ws:
        sub_msg = "{\"id\":" + str(
            sequence) + ", \"method\": \"kline.subscribe\", \"params\":[\"" + symbol + "\"," 
            + str(kline_interval) + "]}"
        await ws.send(sub_msg)

        while True:
            try: 
                msg = await ws.recv()
            except ConnectionResetError:
                break
            msg = json.loads(msg)["kline"]

asyncio 中,事件循环负责 运行 异步任务,但它不处理它们可能抛出的错误。 运行 循环有不同的方式,你可以 运行 它来执行特定的任务,或者 运行 它永远,所以它会让 运行ning 等待新任务。

在你的情况下,任务 listen 抛出一个未捕获的异常(ConnectionResetError),事件循环通过回溯通知你,但它保持 运行ning,也许与其他任务(这意味着 运行 永远)。您刚刚收到通知,就像协程中发生错误一样,任务停止 运行ning。

解法: 在你的协程中处理回溯的错误,你必须确保它会 运行 永远,事件循环不会那样做。

async def listen():
    url = "wss://phemex.com/ws"
    while True:
        try:
            async with websockets.connect(url) as ws:
                sub_msg = "{\"id\":" + str(
                    sequence) + ", \"method\": \"kline.subscribe\", \"params\":[\"" + symbol + "\"," 
                    + str(kline_interval) + "]}"
                await ws.send(sub_msg)
                while True:
                    msg = await ws.recv()
                    msg = json.loads(msg)["kline"]
        except ConnectionResetError:
            print("ConnectionResetError, reconnecting...")