如何在 Google Cloud DataFlow 作业中从 GCS 读取 blob(pickle)文件?

How to read blob (pickle) files from GCS in a Google Cloud DataFlow job?

我尝试 运行 远程使用 pickle 文件的 DataFlow 管道。 在本地,我可以使用下面的代码来调用文件。

with open (known_args.file_path, 'rb') as fp:
     file = pickle.load(fp)

但是,当路径是关于云存储(gs://...) 时,我发现它无效:

IOError: [Errno 2] No such file or directory: 'gs://.../.pkl'

我有点理解为什么它不起作用,但我找不到正确的方法。

open() 是不理解 Google 云存储路径的标准 Python 库函数。您需要改用 Beam FileSystems API,它知道它以及 Beam 支持的其他文件系统。

如果您的 GCS 存储桶中有 pickle 文件,那么您可以将它们作为 BLOB 加载 并像在您的代码中一样进一步处理它们(使用 pickle.load()):

class ReadGcsBlobs(beam.DoFn):
    def process(self, element, *args, **kwargs):
        from apache_beam.io.gcp import gcsio
        gcs = gcsio.GcsIO()
        yield (element, gcs.open(element).read())


# usage example:
files = (p
         | "Initialize" >> beam.Create(["gs://your-bucket-name/pickle_file_path.pickle"])
         | "Read blobs" >> beam.ParDo(ReadGcsBlobs())
        )