使用 python io 从缓冲流中组成一行 reader

Composing a line reader from a buffered stream using python io

我正在使用 python boto 与 s3 交互。我在 s3 上的文件是 CSV,我想在其中使用缓冲区从 s3 读取行以限制内存使用。

我想知道是否有人有办法编写 python 的 io 类 来实现这个目标?目标是拥有某种能够包装 boto 键的抽象,并在键上提供 readline 或迭代器接口(仅提供 read(size=0) 调用。复杂性在于,因为它是存储为 CSV 格式,每行长度可变。

目标是有一个抽象,我可以用它包装 python boto 键,然后实现迭代器协议,这样我就可以将它传递给 csv reader,我最终实现了它我自己。

看起来 python io 确实具备完成此 BufferedReader and TextIOWrapper, and I fooled around with it by naively trying to pass the boto Key to it, but BufferedReader expected an IOBase 对象的所有组件。

然后我围绕 Key 实现了 IOBase 协议,但遇到了 unicode 错误,并且通常不确定我在做什么。

有谁知道 python io 是否可以执行与上述类似的操作?


技术规格:

s3 上有一个包含 1-100 个 CSV 文件的目录。全部具有相同的格式,但行数可变。我正在尝试实现一个函数,该函数采用 boto Keys.

的迭代器

Key提供了一个read(num_bytes)方法。

def yield_lines(keys_iterator):
   # had to custom implement this
   # any way using io??
   # yield each CSV row across keys that only provide `read()` method

我最初的尝试是尝试让 boto Key 遵守 IOBase。我会用缓冲的 reader 组合它,然后尝试使用 TextIOWrapper 从中读取行,但 运行 进入 readinto 的编码问题。

class IOCompatibleKey(object):

   def __init__(self, s3_key):
      self.s3_key = s3_key

   def readable(self):
      return True

   def writeable(self):
      return False

   def read(num_bytes):
      return self.s3_key.read(num_bytes)

   def readinto(n):
      # .... ?????

buffered_reader = BufferedReader(IOCompatibleKey(s3_key))
text_reader = TextIOWrapper(buffered_reader)
for line in text_reader: # <- IS THIS POSSIBLE????
    print(line)

在 Python 2 中,您想 避免 一个 TextIOWrapper 对象,因为 csv.reader() 对象需要一个字节串。它无法处理 TextIOWrapper 提供的 unicode 个对象。

提供 IOBase 实现在其他方面足够简单:

class IOCompatibleKey(object):    
    def __init__(self, s3_key):
        self.s3_key = s3_key

    def readable(self):
        return True

    def writeable(self):
        return False

    @property
    def closed(self):
        return self.s3_key.closed

    def close(self):
        self.s3_key.close()

    def read(self, num_bytes):
        return self.s3_key.read(num_bytes)

    def readinto(self, n):
        chunk = self.s3_key.read(len(n))
        read = len(chunk)
        n[:read] = chunk
        return read

并且仅在使用 Python 2:

时使用 BufferedReader
buffered_reader = BufferedReader(IOCompatibleKey(s3_key))
csv_reader = csv.reader(buffered_reader)
for row in csv_reader:
    print(row)

在 Python 3 上,只需在 BufferedReader()

之上添加一个 TextIOWrapper()

Python2 中的演示,使用模拟键:

>>> import random, csv
>>> from io import BufferedReader
>>> class Key(object):
...     closed = False
...     def read(self, bytes=1024):
...         if random.random() < 0.2:
...             bytes = random.randrange(bytes)
...         return ''.join([random.choice('abcdefghijklmnopqrstuvwxyz \n,') for _ in range(bytes)])
...
>>> s3_key = Key()
>>> buffered_reader = BufferedReader(IOCompatibleKey(s3_key))
>>> next(buffered_reader)   # produces a single \n terminated line
'nffdahuitmdaktibxjsdgyhlyfm gurfyo,nt\n'
>>> reader = csv.reader(buffered_reader)  # which satisfies csv.reader
>>> next(reader)
['bi iydribq', 'u']
>>> next(reader)
['qzxtbhkk se', 'v', 'b', 'nunyjemtkxaphuqmvgfrfjdloxwohqamdtvfqgddfna cjuzpaotccenxhhhgnvrbey']