open_file beam.io FileBasedSource 问题 python 3

open_file in beam.io FileBasedSource issue with python 3

我正在使用 CSVRecordSource 读取在 read_records 函数中使用 open_file 的 Apache Beam 管道中的 CSV。

使用 python 2 一切正常,但是当我迁移到 python 3 时,它会在下面抱怨

next(csv_reader)
_csv.Error: iterator should return strings, not bytes (did you open the file in text mode?)

默认open_file方法以二进制模式打开文件。

所以我改成使用

with open(filename, "rt") as f:

但是当我运行 Google 云中的数据流时失败,因为它无法找到文件并给出错误

FileNotFoundError: [Errno 2] No such file or directory

下面是我的代码

 with self.open_file(filename) as f:
      csv_reader = csv.reader(f, delimiter=self.delimiter, quotechar=self.quote_character)
      header = next(csv_reader)

如何将 CSVRecordSource 与 python 3 一起使用?

您是否使用此处定义的 open_file 方法:https://github.com/apache/beam/blob/6f6feaaeebfc82302ba83c52d087b06a12a5b119/sdks/python/apache_beam/io/filebasedsource.py#L166?

如果是这样,我认为您可以调用基础 FileSystems.open(),将 'application/octet-stream' 替换为 'text/plain'

我通过使用迭代解码迭代器提供的输入(字节)的 iterdecode 解决了这个问题

csv.reader(codecs.iterdecode(f, "utf-8"), delimiter=self.delimiter, quotechar=self.quote_character)