Python 具有多个客户端和无限循环的 asyncio 协议行为

Python asyncio Protocol behaviour with multiple clients and infinite loop

我很难理解我改变后的回显服务器的行为,它试图利用 python 3 的 asyncio 模块。

基本上我有一个无限循环(假设我想在建立连接时无限期地将一些数据从服务器流式传输到客户端)例如MyServer.py:

#! /usr/bin/python3
import asyncio
import os
import time

class MyProtocol(asyncio.Protocol):

    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def connection_lost(self, exc):
        asyncio.get_event_loop().stop()

    def data_received(self, data):
        i = 0
        while True:
            self.transport.write(b'>> %i' %i)
            time.sleep(2)
            i+=1

loop = asyncio.get_event_loop()
coro = loop.create_server(MyProtocol, 
    os.environ.get('MY_SERVICE_ADDRESS', 'localhost'), 
    os.environ.get('MY_SERVICE_PORT', 8100))
server = loop.run_until_complete(coro)

try:
    loop.run_forever()
except:
    loop.run_until_complete(server.wait_closed())
finally:
    loop.close()

接下来,当我连接到 nc ::1 8100 并发送一些文本(例如 "testing")时,我得到以下信息:

user@machine$ nc ::1 8100
*** Connection from('::1', 58503, 0, 0) ***
testing
>> 1
>> 2
>> 3
^C

现在,当我尝试再次使用 nc 进行连接时,我没有收到任何欢迎消息,并且在我尝试向服务器发送一些新文本后,我收到以下错误消息:

user@machine$ nc ::1 8100
Is there anybody out there?
socket.send() raised exception
socket.send() raised exception
...
^C

只是在伤口上撒盐 socket.send() raised exception 消息继续向我的终端发送垃圾邮件,直到我终止 python 服务器进程...

由于我是网络技术的新手(作为桌面恐龙已经太久了!),我不确定为什么会出现上述行为,而且我不知道如何产生预期的行为,大致如下所示:

  1. 服务器启动
  2. 客户端 1 连接到服务器
  3. 服务器向客户端发送欢迎信息 4 客户端1发送任意消息
  4. 只要客户端连接,服务器就会将消息发送回客户端 1
  5. 客户端 1 断开连接(假设电缆已拔出)
  6. 客户端 2 连接到服务器
  7. 为客户端 2 重复步骤 3-6

非常欢迎任何启发!

代码存在多个问题。

首先,data_received 永远不会 returns。在 transport/protocol 级别,asyncio 编程是单线程和基于回调的。应用程序代码分散在 data_received 之类的回调中,事件循环 运行 显示,监控文件描述符并根据需要调用回调。每个回调只允许执行一个简短的计算,调用传输方法,并安排执行进一步的回调。回调不能做的是花很多时间来完成或阻塞等待某事。永不退出的 while 循环特别糟糕,因为它根本不允许事件循环达到 运行。

这就是为什么代码只会在客户端断开连接时吐出异常:connection_lost 永远不会被调用。它应该由事件循环调用,而 never-returning data_received 不会给事件循环一个恢复的机会。由于事件循环被阻塞,程序无法响应其他客户端,data_received 不断尝试向断开连接的客户端发送数据,并记录失败。

正确的表达方式如下:

def data_received(self, data):
    self.i = 0
    loop.call_soon(self.write_to_client)

def write_to_client(self):
    self.transport.write(b'>> %i' % self.i)
    self.i += 1
    loop.call_later(2, self.write_to_client)

请注意 data_receivedwrite_to_client 如何做很少的工作并且很快 return。没有调用 time.sleep(),绝对没有无限循环 - "loop" 隐藏在对 write_to_client.

的递归调用中

此更改揭示了代码中的第二个问题。它的 MyProtocol.connection_lost 停止整个事件循环并退出程序。这使得程序无法响应第二个客户端。解决方法是用 connection_lost:

中设置标志来替换 loop.stop()
def data_received(self, data):
    self._done = False
    self.i = 0
    loop.call_soon(self.write_to_client)

def write_to_client(self):
    if self._done:
        return
    self.transport.write(b'>> %i' % self.i)
    self.i += 1
    loop.call_later(2, self.write_to_client)

def connection_lost(self, exc):
    self._done = True

这允许多个客户端连接。


与上述问题无关,基于回调的代码写起来有点累,尤其是考虑到复杂的代码路径和异常处理时。 (想象一下,尝试使用回调来表达嵌套循环,或者传播在深度嵌入的回调中发生的异常。)asyncio 支持基于协程的 streams 作为基于回调的传输和协议的替代方案。

协程允许编写看起来自然的代码,其中包含循环并且看起来包含阻塞调用,这些调用在幕后被转换为使事件循环能够恢复的挂起点。使用流,问题中的代码将如下所示:

async def talk_to_client(reader, writer):
    peername = writer.get_extra_info('peername')
    print('Connection from {}'.format(peername))

    data = await reader.read(1024)
    i = 0
    while True:
        writer.write(b'>> %i' % i)
        await writer.drain()
        await asyncio.sleep(2)
        i += 1

loop = asyncio.get_event_loop()
coro = asyncio.start_server(talk_to_client, 
    os.environ.get('MY_SERVICE_ADDRESS', 'localhost'), 
    os.environ.get('MY_SERVICE_PORT', 8100))
server = loop.run_until_complete(coro)

loop.run_forever()

talk_to_client 看起来非常像 data_received 的原始实现,但没有缺点。在使用 await 的每个点,如果数据不可用,事件循环将恢复。 time.sleep(n) 替换为 await asyncio.sleep(n),相当于 loop.call_later(n, <resume current coroutine>)。等待 writer.drain() 确保协程在对等方无法处理它获得的输出时暂停,并在对等方断开连接时引发异常。