在 python Apache Beam 中打开 gzip 文件

Opening a gzip file in python Apache Beam

目前是否可以使用 Apache Beam 在 python 中读取 gzip 文件? 我的管道正在使用这行代码从 gcs 中提取 gzip 文件:

beam.io.Read(beam.io.TextFileSource('gs://bucket/file.gz', compression_type='GZIP')) 

但是我收到这个错误:

UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte

我们在 python beam 源代码中注意到压缩文件似乎在写入接收器时得到了处理。 https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py#L445

更详细的回溯:

Traceback (most recent call last):
  File "beam-playground.py", line 11, in <module>
    p.run() 
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run
    return self.runner.run(self)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 103, in run
    super(DirectPipelineRunner, self).run(pipeline)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run
    pipeline.visit(RunVisitor(self))
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit
    self._root_transform().visit(visitor, self, visited)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit
    part.visit(visitor, pipeline, visited)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit
    visitor.visit_transform(self)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform
    self.runner.run_transform(transform_node)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform
    return m(transform_node)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 99, in func_wrapper
    func(self, pvalue, *args, **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 258, in run_Read
    read_values(reader)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 245, in read_values
    read_result = [GlobalWindows.windowed_value(e) for e in reader]
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/io/fileio.py", line 807, in __iter__
    yield self.source.coder.decode(line)
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/coders/coders.py", line 187, in decode
    return value.decode('utf-8')
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte

更新:Python SDK 中的 TextIO 现在支持从压缩文件中读取。

今天 TextIO 在 Python SDK 中实际上不支持从压缩文件中读取。

我运行陷入了类似的问题。我有一个自定义二进制源,我想从中解析和获取数据。问题是 file.io API 是基于 CSV 或 ARVO 的,无论我尝试什么,如果不尝试在换行符上拆分它们,它就不会给我这些行。可以想象,二进制文件不能很好地处理这个问题。

起初我尝试了一个自定义源,最终实现了 3 classes,并且它正在复制核心 Dataflow/Beam 代码。最后,我编写了这个很棒的 monkeypatching 代码来完成我需要完成的工作(此处进行深层源代码测试)。

import apache_beam as beam
from apache_beam.io.fileio import coders

def _TextFileReader__iter(self):
    # The full data file is had here and can be read like normal
    # You can even limit the character bit here. (I did 9 to grab the file format)
    data = self._file.read()
    # Now you can either yield the whole file as a single data entry
    # and run a ParDo to split it, or you can iterate in here and 
    # yield each row. I chose the latter, but I'm showing an example 
    # of the former.
    yield data

# This monkeypatch good!
beam.io.fileio.TextFileReader.__iter__ = _TextFileReader__iter

要调用此源并确保它是二进制的,我执行了以下操作:

pipeline | 'start_3' >> beam.io.Read(
    beam.io.TextFileSource( 'gs://MY_BUCKET/sample.bin',
        coder=coders.BytesCoder()
    )
)

注意到coders.BytesCoders()了吗?如果没有它,它会尝试将字节转换为非二进制的东西,这对我的解析引擎不利。 ;)

花了我一天的时间才弄明白这个问题。但是,如果您使用此方法,则几乎可以使用 Dataflow 中的 file.io class 执行任何操作。 ;)

我运行陷入同样的​​问题。我试图从 GCS 读取二进制 GZ 文件,解压缩它们,然后将它们发送到其他地方进行处理。我分两步解决了。

首先,确保您使用的是正确的 Python 库;我的原始库已过时(我至少使用 v0.4):pip install --upgrade google-cloud-dataflow.

其次,我按如下方式构建管道:

import apache_beam as beam
from apache_beam import (coders, io, transforms)

raw_logs = (p
            | io.Read("ReadLogsFromGCS", beam.io.TextFileSource(
                      "gs://my-bucket/logs-*.gz",
                      coder=coders.BytesCoder()))
            | transforms.Map(lambda x: x)
            | io.Write("WriteToLocalhost", io.textio.WriteToText(
                       "/tmp/flattened-logs",
                       file_name_suffix=".json")))
p.run()

您应该在 运行 管道之后有一个名为 /tmp/flattened-logs.json 的文件。