Asyncio 使用 StreamReader 解码 utf-8
Asyncio decode utf-8 with StreamReader
我正在习惯 asyncio 并发现任务处理非常好,但是很难将异步库与传统的 io 库混合使用。我目前面临的问题是如何正确解码异步 StreamReader。
最简单的解决方案是 read()
字节字符串块,然后对每个块进行解码 - 请参见下面的代码。 (在我的程序中,我不会打印每个块,而是将其解码为字符串并将其发送到另一个方法中进行处理):
import asyncio
import aiohttp
async def get_data(port):
url = 'http://localhost:{}/'.format(port)
r = await aiohttp.get(url)
stream = r.content
while not stream.at_eof():
data = await stream.read(4)
print(data.decode('utf-8'))
这工作正常,直到有一个 utf-8 字符被分成太多块。例如,如果响应是 b'M\xc3\xa4dchen mit Bi\xc3\x9f\n'
,那么读取 3 个块将起作用,但 4 个块将不起作用(因为 \xc3
和 \x9f
在不同的块中并且解码以 \xc3
将引发以下错误:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 3: unexpected end of data
我查看了这个问题的正确解决方案,至少在阻塞世界中,似乎是 io.TextIOWrapper 或 codecs.StreamReaderWriter(它们的区别在 PEP 0400 中讨论) ).但是,这两者都依赖于典型的阻塞流。
我花了 30 分钟用 asyncio 搜索示例,并不断找到我的 decode() 解决方案。有谁知道更好的解决方案,或者这是 python 的 asyncio 中缺少的功能?
作为参考,以下是将两个 "standard" 解码器与异步流一起使用的结果。
使用编解码流reader:
r = yield from aiohttp.get(url)
decoder = codecs.getreader('utf-8')
stream = decoder(r.content)
异常:
File "echo_client.py", line 13, in get_data
data = yield from stream.read(4)
File "/usr/lib/python3.5/codecs.py", line 497, in read
data = self.bytebuffer + newdata
TypeError: can't concat bytes to generator
(它直接调用 read(),而不是 yield from
或 await
)
我也试过用 io.TextIOWrapper:
包装流
stream = TextIOWrapper(r.content)
但这会导致以下结果:
File "echo_client.py", line 10, in get_data
stream = TextIOWrapper(r.content)
AttributeError: 'FlowControlStreamReader' object has no attribute 'readable'
P.S。如果你想要一个示例测试用例,请查看 this gist。您可以 运行 它与 python3.5 重现错误。如果将块大小从 4 更改为 3(或 30),它将正常工作。
编辑
接受的答案解决了这个问题。谢谢!如果其他人有这个问题,这里有一个简单的包装器 class 我用来处理 StreamReader 上的解码:
import codecs
class DecodingStreamReader:
def __init__(self, stream, encoding='utf-8', errors='strict'):
self.stream = stream
self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)
async def read(self, n=-1):
data = await self.stream.read(n)
if isinstance(data, (bytes, bytearray)):
data = self.decoder.decode(data)
return data
def at_eof(self):
return self.stream.at_eof()
您可以使用 IncrementalDecoder:
Utf8Decoder = codecs.getincrementaldecoder('utf-8')
以你为例:
decoder = Utf8Decoder(error='strict')
while not stream.at_eof():
data = await stream.read(4)
print(decoder.decode(data), end='')
输出:
Mädchen mit Biß
我正在习惯 asyncio 并发现任务处理非常好,但是很难将异步库与传统的 io 库混合使用。我目前面临的问题是如何正确解码异步 StreamReader。
最简单的解决方案是 read()
字节字符串块,然后对每个块进行解码 - 请参见下面的代码。 (在我的程序中,我不会打印每个块,而是将其解码为字符串并将其发送到另一个方法中进行处理):
import asyncio
import aiohttp
async def get_data(port):
url = 'http://localhost:{}/'.format(port)
r = await aiohttp.get(url)
stream = r.content
while not stream.at_eof():
data = await stream.read(4)
print(data.decode('utf-8'))
这工作正常,直到有一个 utf-8 字符被分成太多块。例如,如果响应是 b'M\xc3\xa4dchen mit Bi\xc3\x9f\n'
,那么读取 3 个块将起作用,但 4 个块将不起作用(因为 \xc3
和 \x9f
在不同的块中并且解码以 \xc3
将引发以下错误:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 3: unexpected end of data
我查看了这个问题的正确解决方案,至少在阻塞世界中,似乎是 io.TextIOWrapper 或 codecs.StreamReaderWriter(它们的区别在 PEP 0400 中讨论) ).但是,这两者都依赖于典型的阻塞流。
我花了 30 分钟用 asyncio 搜索示例,并不断找到我的 decode() 解决方案。有谁知道更好的解决方案,或者这是 python 的 asyncio 中缺少的功能?
作为参考,以下是将两个 "standard" 解码器与异步流一起使用的结果。
使用编解码流reader:
r = yield from aiohttp.get(url)
decoder = codecs.getreader('utf-8')
stream = decoder(r.content)
异常:
File "echo_client.py", line 13, in get_data
data = yield from stream.read(4)
File "/usr/lib/python3.5/codecs.py", line 497, in read
data = self.bytebuffer + newdata
TypeError: can't concat bytes to generator
(它直接调用 read(),而不是 yield from
或 await
)
我也试过用 io.TextIOWrapper:
包装流stream = TextIOWrapper(r.content)
但这会导致以下结果:
File "echo_client.py", line 10, in get_data
stream = TextIOWrapper(r.content)
AttributeError: 'FlowControlStreamReader' object has no attribute 'readable'
P.S。如果你想要一个示例测试用例,请查看 this gist。您可以 运行 它与 python3.5 重现错误。如果将块大小从 4 更改为 3(或 30),它将正常工作。
编辑
接受的答案解决了这个问题。谢谢!如果其他人有这个问题,这里有一个简单的包装器 class 我用来处理 StreamReader 上的解码:
import codecs
class DecodingStreamReader:
def __init__(self, stream, encoding='utf-8', errors='strict'):
self.stream = stream
self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)
async def read(self, n=-1):
data = await self.stream.read(n)
if isinstance(data, (bytes, bytearray)):
data = self.decoder.decode(data)
return data
def at_eof(self):
return self.stream.at_eof()
您可以使用 IncrementalDecoder:
Utf8Decoder = codecs.getincrementaldecoder('utf-8')
以你为例:
decoder = Utf8Decoder(error='strict')
while not stream.at_eof():
data = await stream.read(4)
print(decoder.decode(data), end='')
输出:
Mädchen mit Biß