Asyncio 使用 StreamReader 解码 utf-8

Asyncio decode utf-8 with StreamReader

我正在习惯 asyncio 并发现任务处理非常好,但是很难将异步库与传统的 io 库混合使用。我目前面临的问题是如何正确解码异步 StreamReader。

最简单的解决方案是 read() 字节字符串块,然后对每个块进行解码 - 请参见下面的代码。 (在我的程序中,我不会打印每个块,而是将其解码为字符串并将其发送到另一个方法中进行处理):

import asyncio
import aiohttp

async def get_data(port):
    url = 'http://localhost:{}/'.format(port)
    r = await aiohttp.get(url)
    stream = r.content
    while not stream.at_eof():
        data = await stream.read(4)
        print(data.decode('utf-8'))

这工作正常,直到有一个 utf-8 字符被分成太多块。例如,如果响应是 b'M\xc3\xa4dchen mit Bi\xc3\x9f\n',那么读取 3 个块将起作用,但 4 个块将不起作用(因为 \xc3\x9f 在不同的块中并且解码以 \xc3 将引发以下错误:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 3: unexpected end of data

我查看了这个问题的正确解决方案,至少在阻塞世界中,似乎是 io.TextIOWrapper 或 codecs.StreamReaderWriter(它们的区别在 PEP 0400 中讨论) ).但是,这两者都依赖于典型的阻塞流。

我花了 30 分钟用 asyncio 搜索示例,并不断找到我的 decode() 解决方案。有谁知道更好的解决方案,或者这是 python 的 asyncio 中缺少的功能?

作为参考,以下是将两个 "standard" 解码器与异步流一起使用的结果。

使用编解码流reader:

r = yield from aiohttp.get(url)
decoder = codecs.getreader('utf-8')
stream = decoder(r.content)

异常:

File "echo_client.py", line 13, in get_data
  data = yield from stream.read(4)
File "/usr/lib/python3.5/codecs.py", line 497, in read
  data = self.bytebuffer + newdata
TypeError: can't concat bytes to generator

(它直接调用 read(),而不是 yield fromawait

我也试过用 io.TextIOWrapper:

包装流
stream = TextIOWrapper(r.content)

但这会导致以下结果:

File "echo_client.py", line 10, in get_data
  stream = TextIOWrapper(r.content)
AttributeError: 'FlowControlStreamReader' object has no attribute 'readable'

P.S。如果你想要一个示例测试用例,请查看 this gist。您可以 运行 它与 python3.5 重现错误。如果将块大小从 4 更改为 3(或 30),它将正常工作。

编辑

接受的答案解决了这个问题。谢谢!如果其他人有这个问题,这里有一个简单的包装器 class 我用来处理 StreamReader 上的解码:

import codecs

class DecodingStreamReader:
    def __init__(self, stream, encoding='utf-8', errors='strict'):
        self.stream = stream
        self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)

    async def read(self, n=-1):
        data = await self.stream.read(n)
        if isinstance(data, (bytes, bytearray)):
            data = self.decoder.decode(data)
        return data

    def at_eof(self):
        return self.stream.at_eof() 

您可以使用 IncrementalDecoder:

Utf8Decoder = codecs.getincrementaldecoder('utf-8')

以你为例:

decoder = Utf8Decoder(error='strict')
while not stream.at_eof():
    data = await stream.read(4)
    print(decoder.decode(data), end='')

输出:

Mädchen mit Biß