将 bytes 可迭代转换为 str 的可迭代,其中每个值都是一行

Convert a bytes iterable to an iterable of str, where each value is a line

我有一个bytes的可迭代对象,例如

bytes_iter = (
    b'col_1,',
    b'c',
    b'ol_2\n1',
    b',"val',
    b'ue"\n',
)

(但通常这 不会 被硬编码或一次全部可用,但由生成器提供)我想将其转换为 [=13 的可迭代对象=] 行,换行符在前面是未知的,但可以是 \r\n\r\n 中的任何一个。所以在这种情况下将是:

lines_iter = (
    'col_1,col_2',
    '1,"value"',
)

(但同样,只是作为一个可迭代对象,而不是一次全部存储在内存中)。

我该怎么做?

上下文:我的目标是将可迭代的 str 行传递给 csv.reader(我 认为 需要整行?),但我感兴趣的是这个回答一般。

我用了yield and re.finditer.

The yield expression is used when defining a generator function or an asynchronous generator function and thus can only be used in the body of a function definition. Using a yield expression in a function’s body causes that function to be a generator function

Return an iterator yielding match objects over all non-overlapping matches for the RE pattern in string. The string is scanned left-to-right, and matches are returned in the order found. Empty matches are included in the result.
If there are no groups, return a list of strings matching the whole pattern. If there is exactly one group, return a list of strings matching that group. If multiple groups are present, return a list of tuples of strings matching the groups. Non-capturing groups do not affect the form of the result.

正则表达式([^\r\n]*)(\r\n|\r|\n)?可以分为两部分进行匹配(即两组)。第一组匹配不含\r\n的数据,第二组匹配\r\n\r\n.

import re
find_rule = re.compile("([^\r\n]*)(\r\n|\r|\n)?")


def converter(byte_data):
    left_d = ""
    for d in byte_data:
        # Used to save the previous match result in the `for` loop
        prev_result = None
        # Concatenate the last part of the previous data with the current data,
        # used to deal with the case of `\r\n` being separated.
        d = left_d + d.decode()
        left_d = ""
        # Using `find_rule.finditer` the last value("") will be invalid
        for match_result in find_rule.finditer(d):
            i = match_result.group()
            if not i:
                # The program comes to this point, indicating that i == "", which is the last matching value
                left_d, prev_result = prev_result.group(), None
                continue
            if prev_result:
                if prev_result.group(2) is None:
                    # The program goes here, represented as the last valid value matched
                    left_d = prev_result.group()
                else:
                    # Returns the previous matched value
                    yield prev_result.group()
            # Save the current match result
            prev_result = match_result

    else:
        yield left_d


for i in (converter(iter((
        b'col_1,\r',
        b'\nc',
        b'ol_2\n1',
        b'\n,"val;\r',
        b'ue"\n')))
):
    print(repr(i))

输出:

'col_1,\r\n'
'col_2\n'
'1\n'
',"val;\r'
'ue"\n'

使用 io 模块为您完成大部分工作:

class ReadableIterator(io.IOBase):
    def __init__(self, it):
        self.it = iter(it)
    def read(self, n):
        # ignore argument, nobody actually cares
        # note that it is *critical* that we suppress the `StopIteration` here
        return next(self.it, b'')
    def readable(self):
        return True

然后直接调用 io.TextIOWrapper(ReadableIterator(some_iterable_of_bytes)).

也许我遗漏了一些重要的(或微妙的)东西,因为一些赞成的答案似乎比这更奇特,但我认为你可以解码和链接字节并使用 itertools.groupby 来获得字符串生成器:

from itertools import groupby, chain

bytes_iter = (
    b'col_1,',
    b'c',
    b'ol_2\n',
    b'1,"val;',
    b'ue"\n'
)

def make_strings(G):
    strings = chain.from_iterable(map(bytes.decode, G))
    for k, g in groupby(strings, key=lambda c: c not in '\n\r'):
        if k:
            yield ''.join(g)                            

list(make_strings(bytes_iter))
# ['col_1,col_2', '1,"val;ue"']

将@o11c 和@user2357112 放在一起支持 Monica 的贡献:

import codecs
import csv
import io

def yield_bytes():
    chunks = [
        b'col_1,',
        b'c',
        b'ol_2\n1',
        b',"val',
        b'ue"\n',
        b'Hello,'
        b'\xe4\xb8',
        b'\x96',
        b'\xe7',
        b'\x95\x8c\n'
        b'\n'
    ]

    for chunk in chunks:
        yield(chunk)

decoder = codecs.getincrementaldecoder('utf-8')()

def yield_encoded_bytes():
    s = None
    for bytes in yield_bytes():
        s = decoder.decode(bytes, final=False)
        if s:
            yield s.encode('utf-8')

class ReadableIterator(io.IOBase):
    def __init__(self, it):
        self.it = iter(it)
    def read(self, n):
        # ignore argument, nobody actually cares
        # note that it is *critical* that we suppress the `StopIteration` here
        return next(self.it, b'')
    def readable(self):
        return True

f = io.TextIOWrapper(ReadableIterator(yield_encoded_bytes()))

for row in csv.reader(f):
    print(row)

我得到:

['col_1', 'col_2']
['1', 'value']
['Hello', '世界']
[]