使用 asyncio 与其他对等方连接时如何处理 ConnectionRefusedError

How to handle ConnectionRefusedError when connecting with other peers using asyncio

我使用 asyncio 与我正在使用的 bittorrent 客户端中的其他对等方连接。当无法连接某些对等点时,程序会崩溃并出现 ConnectionRefusedErrorTimeoutError 异常。一些对等点无法连接是正常的还是我的代码有问题。如果那是正常的,我应该如何处理异常?我试图将 try except 放在 loop.create_connection() 周围,但那没有做任何事情。

这是我的代码:

class Torrent():
    def __init__(self, torrent_file, loop):
        self.torrent = Torrent(torrent_file)
        self.peers = self.get_peers()
        self.loop = loop

    ...

    def connect_to_peers(self):
        tasks = []
        for peer in self.peers:
            try:
                # returns a coroutine
                connection = self.loop.create_connection(PeerProtocol, peer['host'], peer['port'])
                tasks.append(asyncio.Task(connection))
            except ConnectionRefusedError:
                print('caught')
            except TimeoutError:
                print('timeout error')

        return tasks


def main():
    loop = asyncio.get_event_loop()

    filename = 'street-fighter.torrent'
    client = TorrentClient(filename, loop)
    tasks = client.connect_to_peers()

    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt:
        pass

class PeerProtocol(asyncio.Protocol):

    def connection_made(self, transport):
        host, port = transport.get_extra_info('peername')
        print('connected with {}:{}'.format(host, port))

    def connection_lost(self, exc):
        print('disconnected...')
        print('exc: {}'.format(exc))

这是输出:

connected with 80.94.76.7:14122
connected with 174.110.236.233:45308
connected with 78.177.119.170:27311
connected with 95.15.59.242:21426
disconnected...
exc: [Errno 54] Connection reset by peer
disconnected...
exc: [Errno 54] Connection reset by peer
disconnected...
exc: None
disconnected...
exc: None
Task exception was never retrieved
future: <Task finished coro=<BaseEventLoop.create_connection() done, defined at /Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py:548> exception=TimeoutError(60, "Connect call failed ('93.34.49.17', 13311)")>
Traceback (most recent call last):
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 645, in create_connection
    raise exceptions[0]
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 632, in create_connection
    yield from self.sock_connect(sock, address)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 358, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 290, in _wakeup
    future.result()
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/selector_events.py", line 436, in _sock_connect_cb
    raise OSError(err, 'Connect call failed %s' % (address,))
TimeoutError: [Errno 60] Connect call failed ('93.34.49.17', 13311)
Task exception was never retrieved
future: <Task finished coro=<BaseEventLoop.create_connection() done, defined at /Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py:548> exception=TimeoutError(60, "Connect call failed ('197.29.6.31', 50735)")>
Traceback (most recent call last):
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 645, in create_connection
    raise exceptions[0]
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 632, in create_connection
    yield from self.sock_connect(sock, address)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 358, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 290, in _wakeup
    future.result()
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/selector_events.py", line 436, in _sock_connect_cb
    raise OSError(err, 'Connect call failed %s' % (address,))
TimeoutError: [Errno 60] Connect call failed ('197.29.6.31', 50735)
Task exception was never retrieved
future: <Task finished coro=<BaseEventLoop.create_connection() done, defined at /Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py:548> exception=TimeoutError(60, "Connect call failed ('195.174.165.47', 61567)")>
Traceback (most recent call last):
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 645, in create_connection
    raise exceptions[0]
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 632, in create_connection
    yield from self.sock_connect(sock, address)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 358, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 290, in _wakeup
    future.result()
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/selector_events.py", line 436, in _sock_connect_cb
    raise OSError(err, 'Connect call failed %s' % (address,))
TimeoutError: [Errno 60] Connect call failed ('195.174.165.47', 61567)
Task exception was never retrieved
future: <Task finished coro=<BaseEventLoop.create_connection() done, defined at /Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py:548> exception=ConnectionRefusedError(61, "Connect call failed ('69.122.194.81', 6881)")>
Traceback (most recent call last):
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 645, in create_connection
    raise exceptions[0]
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 632, in create_connection
    yield from self.sock_connect(sock, address)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 358, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 290, in _wakeup
    future.result()
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/selector_events.py", line 436, in _sock_connect_cb
    raise OSError(err, 'Connect call failed %s' % (address,))
ConnectionRefusedError: [Errno 61] Connect call failed ('69.122.194.81', 6881)
Task exception was never retrieved
future: <Task finished coro=<BaseEventLoop.create_connection() done, defined at /Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py:548> exception=TimeoutError(60, "Connect call failed ('41.210.123.12', 48319)")>
Traceback (most recent call last):
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 645, in create_connection
    raise exceptions[0]
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 632, in create_connection
    yield from self.sock_connect(sock, address)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 358, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 290, in _wakeup
    future.result()
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/selector_events.py", line 436, in _sock_connect_cb
    raise OSError(err, 'Connect call failed %s' % (address,))
TimeoutError: [Errno 60] Connect call failed ('41.210.123.12', 48319)
Task exception was never retrieved
future: <Task finished coro=<BaseEventLoop.create_connection() done, defined at /Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py:548> exception=TimeoutError(60, "Connect call failed ('78.174.159.195', 35414)")>
Traceback (most recent call last):
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 645, in create_connection
    raise exceptions[0]
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 632, in create_connection
    yield from self.sock_connect(sock, address)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 358, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 290, in _wakeup
    future.result()
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/selector_events.py", line 436, in _sock_connect_cb
    raise OSError(err, 'Connect call failed %s' % (address,))
TimeoutError: [Errno 60] Connect call failed ('78.174.159.195', 35414)
Task exception was never retrieved
future: <Task finished coro=<BaseEventLoop.create_connection() done, defined at /Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py:548> exception=TimeoutError(60, "Connect call failed ('85.103.126.106', 22665)")>
Traceback (most recent call last):
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 645, in create_connection
    raise exceptions[0]
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 632, in create_connection
    yield from self.sock_connect(sock, address)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 358, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 290, in _wakeup
    future.result()
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/selector_events.py", line 436, in _sock_connect_cb
    raise OSError(err, 'Connect call failed %s' % (address,))
TimeoutError: [Errno 60] Connect call failed ('85.103.126.106', 22665)
Task exception was never retrieved
future: <Task finished coro=<BaseEventLoop.create_connection() done, defined at /Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py:548> exception=TimeoutError(60, "Connect call failed ('81.228.224.142', 13570)")>
Traceback (most recent call last):
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 645, in create_connection
    raise exceptions[0]
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/base_events.py", line 632, in create_connection
    yield from self.sock_connect(sock, address)
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 358, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/tasks.py", line 290, in _wakeup
    future.result()
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/Users/shangsunset/.pyenv/versions/3.5.1/lib/python3.5/asyncio/selector_events.py", line 436, in _sock_connect_cb
    raise OSError(err, 'Connect call failed %s' % (address,))
TimeoutError: [Errno 60] Connect call failed ('81.228.224.142', 13570)

我是 asyncio 的新手,不确定我的做法是否正确。

谢谢。

  1. asyncio.Task 立即创建任务并 return。您应该等待创建的任务以获取其结果(包括引发异常的情况)。有关详细信息,请参阅
  2. 我会 使用 asyncio.ensure_future 而不是 asyncio.Task
  3. 如果你想并行执行一些协程 asyncio.gather is common way 来做。任务通常需要在你想启动一些协程时 "in background".

基于所有代码可能看起来像这样(我没有测试):

class Torrent():

    # ...

    async def connect_to_peer(self, peer):
        try:
            # await, here exception would be raised
            await self.loop.create_connection(
                PeerProtocol, 
                peer['host'], 
                peer['port']
            )
        except ConnectionRefusedError:
            print('caught')
        except TimeoutError:
            print('timeout error')


    async def connect_to_peers(self):  # async function
        await asyncio.gather(
            *[self.connect_to_peer(peer) for peer in self.peers], 
            loop=self.loop  # fixed here!
        )
        # Btw, you can add param return_exceptions=True to get exceptions
        # in results here instead of ignoring it inside connect_to_peer

# ...

loop.run_until_complete(client.connect_to_peers())