如何在 python asyncio 中处理 tcp 客户端套接字自动重新连接?

how to handle tcp client socket auto reconnect in python asyncio?

我正在使用 python asyncio 流连接到多个套接字服务器,但是当服务器关闭时,我的代码无法自动重新连接。

我需要的是,当服务器宕机时,我的脚本将每 5 秒尝试重新连接一次,直到连接成功并重新开始解析数据。

import asyncio

server1 = {'host': '192.168.1.51', 'port': 11110}
server2 = {'host': '192.168.1.52', 'port': 11110}


async def tcp_client(host, port, loop):
    print('connect to server {} {}'.format(host, str(port)))
    reader, writer = await asyncio.open_connection(host, port, loop=loop)

    while True:
        data = await reader.read(100)
        print('raw data received: {}'.format(data))

        await asyncio.sleep(0.1)


loop = asyncio.get_event_loop()
try:
    for server in [server1, server2]:
        loop.run_until_complete(tcp_client(server['host'], server['port'], loop))
        print('task added: connect to server {} {}'.format(server['host'], server['port']))
finally:
    loop.close()
    print('loop closed')

您可以通过简单地遍历 try/except 语句来处理重新连接。

此外,asyncio.wait_for 可用于设置读取操作的超时。

考虑这个工作示例:

import asyncio

async def tcp_client(host, port):
    reader, writer = await asyncio.open_connection(host, port)
    try:
        while not reader.at_eof():
            data = await asyncio.wait_for(reader.read(100), 3.0)
            print('raw data received: {}'.format(data))
    finally:
        writer.close()

async def tcp_reconnect(host, port):
    server = '{} {}'.format(host, port)
    while True:
        print('Connecting to server {} ...'.format(server))
        try:
            await tcp_client(host, port)
        except ConnectionRefusedError:
            print('Connection to server {} failed!'.format(server))
        except asyncio.TimeoutError:
            print('Connection to server {} timed out!'.format(server))
        else:
            print('Connection to server {} is closed.'.format(server))
        await asyncio.sleep(2.0)

async def main():
    servers = [('localhost', 8888), ('localhost', 9999)]
    coros = [tcp_reconnect(host, port) for host, port in servers]
    await asyncio.gather(*coros)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()