将 bytes 可迭代转换为 str 的可迭代,其中每个值都是一行
Convert a bytes iterable to an iterable of str, where each value is a line
我有一个bytes
的可迭代对象,例如
bytes_iter = (
b'col_1,',
b'c',
b'ol_2\n1',
b',"val',
b'ue"\n',
)
(但通常这 不会 被硬编码或一次全部可用,但由生成器提供)我想将其转换为 [=13 的可迭代对象=] 行,换行符在前面是未知的,但可以是 \r
、\n
或 \r\n
中的任何一个。所以在这种情况下将是:
lines_iter = (
'col_1,col_2',
'1,"value"',
)
(但同样,只是作为一个可迭代对象,而不是一次全部存储在内存中)。
我该怎么做?
上下文:我的目标是将可迭代的 str 行传递给 csv.reader
(我 认为 需要整行?),但我感兴趣的是这个回答一般。
我用了yield
and re.finditer
.
The yield expression is used when defining a generator function or an asynchronous generator function and thus can only be used in the body of a function definition. Using a yield expression in a function’s body causes that function to be a generator function
Return an iterator yielding match objects over all non-overlapping matches for the RE pattern in string. The string is scanned left-to-right, and matches are returned in the order found. Empty matches are included in the result.
If there are no groups, return a list of strings matching the whole pattern. If there is exactly one group, return a list of strings matching that group. If multiple groups are present, return a list of tuples of strings matching the groups. Non-capturing groups do not affect the form of the result.
正则表达式([^\r\n]*)(\r\n|\r|\n)?
可以分为两部分进行匹配(即两组)。第一组匹配不含\r
和\n
的数据,第二组匹配\r
、\n
或\r\n
.
import re
find_rule = re.compile("([^\r\n]*)(\r\n|\r|\n)?")
def converter(byte_data):
left_d = ""
for d in byte_data:
# Used to save the previous match result in the `for` loop
prev_result = None
# Concatenate the last part of the previous data with the current data,
# used to deal with the case of `\r\n` being separated.
d = left_d + d.decode()
left_d = ""
# Using `find_rule.finditer` the last value("") will be invalid
for match_result in find_rule.finditer(d):
i = match_result.group()
if not i:
# The program comes to this point, indicating that i == "", which is the last matching value
left_d, prev_result = prev_result.group(), None
continue
if prev_result:
if prev_result.group(2) is None:
# The program goes here, represented as the last valid value matched
left_d = prev_result.group()
else:
# Returns the previous matched value
yield prev_result.group()
# Save the current match result
prev_result = match_result
else:
yield left_d
for i in (converter(iter((
b'col_1,\r',
b'\nc',
b'ol_2\n1',
b'\n,"val;\r',
b'ue"\n')))
):
print(repr(i))
输出:
'col_1,\r\n'
'col_2\n'
'1\n'
',"val;\r'
'ue"\n'
使用 io
模块为您完成大部分工作:
class ReadableIterator(io.IOBase):
def __init__(self, it):
self.it = iter(it)
def read(self, n):
# ignore argument, nobody actually cares
# note that it is *critical* that we suppress the `StopIteration` here
return next(self.it, b'')
def readable(self):
return True
然后直接调用 io.TextIOWrapper(ReadableIterator(some_iterable_of_bytes))
.
也许我遗漏了一些重要的(或微妙的)东西,因为一些赞成的答案似乎比这更奇特,但我认为你可以解码和链接字节并使用 itertools.groupby
来获得字符串生成器:
from itertools import groupby, chain
bytes_iter = (
b'col_1,',
b'c',
b'ol_2\n',
b'1,"val;',
b'ue"\n'
)
def make_strings(G):
strings = chain.from_iterable(map(bytes.decode, G))
for k, g in groupby(strings, key=lambda c: c not in '\n\r'):
if k:
yield ''.join(g)
list(make_strings(bytes_iter))
# ['col_1,col_2', '1,"val;ue"']
将@o11c 和@user2357112 放在一起支持 Monica 的贡献:
import codecs
import csv
import io
def yield_bytes():
chunks = [
b'col_1,',
b'c',
b'ol_2\n1',
b',"val',
b'ue"\n',
b'Hello,'
b'\xe4\xb8',
b'\x96',
b'\xe7',
b'\x95\x8c\n'
b'\n'
]
for chunk in chunks:
yield(chunk)
decoder = codecs.getincrementaldecoder('utf-8')()
def yield_encoded_bytes():
s = None
for bytes in yield_bytes():
s = decoder.decode(bytes, final=False)
if s:
yield s.encode('utf-8')
class ReadableIterator(io.IOBase):
def __init__(self, it):
self.it = iter(it)
def read(self, n):
# ignore argument, nobody actually cares
# note that it is *critical* that we suppress the `StopIteration` here
return next(self.it, b'')
def readable(self):
return True
f = io.TextIOWrapper(ReadableIterator(yield_encoded_bytes()))
for row in csv.reader(f):
print(row)
我得到:
['col_1', 'col_2']
['1', 'value']
['Hello', '世界']
[]
我有一个bytes
的可迭代对象,例如
bytes_iter = (
b'col_1,',
b'c',
b'ol_2\n1',
b',"val',
b'ue"\n',
)
(但通常这 不会 被硬编码或一次全部可用,但由生成器提供)我想将其转换为 [=13 的可迭代对象=] 行,换行符在前面是未知的,但可以是 \r
、\n
或 \r\n
中的任何一个。所以在这种情况下将是:
lines_iter = (
'col_1,col_2',
'1,"value"',
)
(但同样,只是作为一个可迭代对象,而不是一次全部存储在内存中)。
我该怎么做?
上下文:我的目标是将可迭代的 str 行传递给 csv.reader
(我 认为 需要整行?),但我感兴趣的是这个回答一般。
我用了yield
and re.finditer
.
The yield expression is used when defining a generator function or an asynchronous generator function and thus can only be used in the body of a function definition. Using a yield expression in a function’s body causes that function to be a generator function
Return an iterator yielding match objects over all non-overlapping matches for the RE pattern in string. The string is scanned left-to-right, and matches are returned in the order found. Empty matches are included in the result.
If there are no groups, return a list of strings matching the whole pattern. If there is exactly one group, return a list of strings matching that group. If multiple groups are present, return a list of tuples of strings matching the groups. Non-capturing groups do not affect the form of the result.
正则表达式([^\r\n]*)(\r\n|\r|\n)?
可以分为两部分进行匹配(即两组)。第一组匹配不含\r
和\n
的数据,第二组匹配\r
、\n
或\r\n
.
import re
find_rule = re.compile("([^\r\n]*)(\r\n|\r|\n)?")
def converter(byte_data):
left_d = ""
for d in byte_data:
# Used to save the previous match result in the `for` loop
prev_result = None
# Concatenate the last part of the previous data with the current data,
# used to deal with the case of `\r\n` being separated.
d = left_d + d.decode()
left_d = ""
# Using `find_rule.finditer` the last value("") will be invalid
for match_result in find_rule.finditer(d):
i = match_result.group()
if not i:
# The program comes to this point, indicating that i == "", which is the last matching value
left_d, prev_result = prev_result.group(), None
continue
if prev_result:
if prev_result.group(2) is None:
# The program goes here, represented as the last valid value matched
left_d = prev_result.group()
else:
# Returns the previous matched value
yield prev_result.group()
# Save the current match result
prev_result = match_result
else:
yield left_d
for i in (converter(iter((
b'col_1,\r',
b'\nc',
b'ol_2\n1',
b'\n,"val;\r',
b'ue"\n')))
):
print(repr(i))
输出:
'col_1,\r\n'
'col_2\n'
'1\n'
',"val;\r'
'ue"\n'
使用 io
模块为您完成大部分工作:
class ReadableIterator(io.IOBase):
def __init__(self, it):
self.it = iter(it)
def read(self, n):
# ignore argument, nobody actually cares
# note that it is *critical* that we suppress the `StopIteration` here
return next(self.it, b'')
def readable(self):
return True
然后直接调用 io.TextIOWrapper(ReadableIterator(some_iterable_of_bytes))
.
也许我遗漏了一些重要的(或微妙的)东西,因为一些赞成的答案似乎比这更奇特,但我认为你可以解码和链接字节并使用 itertools.groupby
来获得字符串生成器:
from itertools import groupby, chain
bytes_iter = (
b'col_1,',
b'c',
b'ol_2\n',
b'1,"val;',
b'ue"\n'
)
def make_strings(G):
strings = chain.from_iterable(map(bytes.decode, G))
for k, g in groupby(strings, key=lambda c: c not in '\n\r'):
if k:
yield ''.join(g)
list(make_strings(bytes_iter))
# ['col_1,col_2', '1,"val;ue"']
将@o11c 和@user2357112 放在一起支持 Monica 的贡献:
import codecs
import csv
import io
def yield_bytes():
chunks = [
b'col_1,',
b'c',
b'ol_2\n1',
b',"val',
b'ue"\n',
b'Hello,'
b'\xe4\xb8',
b'\x96',
b'\xe7',
b'\x95\x8c\n'
b'\n'
]
for chunk in chunks:
yield(chunk)
decoder = codecs.getincrementaldecoder('utf-8')()
def yield_encoded_bytes():
s = None
for bytes in yield_bytes():
s = decoder.decode(bytes, final=False)
if s:
yield s.encode('utf-8')
class ReadableIterator(io.IOBase):
def __init__(self, it):
self.it = iter(it)
def read(self, n):
# ignore argument, nobody actually cares
# note that it is *critical* that we suppress the `StopIteration` here
return next(self.it, b'')
def readable(self):
return True
f = io.TextIOWrapper(ReadableIterator(yield_encoded_bytes()))
for row in csv.reader(f):
print(row)
我得到:
['col_1', 'col_2']
['1', 'value']
['Hello', '世界']
[]