如何在 Google Cloud DataFlow 作业中从 GCS 读取 blob(pickle)文件?
How to read blob (pickle) files from GCS in a Google Cloud DataFlow job?
我尝试 运行 远程使用 pickle 文件的 DataFlow 管道。
在本地,我可以使用下面的代码来调用文件。
with open (known_args.file_path, 'rb') as fp:
file = pickle.load(fp)
但是,当路径是关于云存储(gs://...) 时,我发现它无效:
IOError: [Errno 2] No such file or directory: 'gs://.../.pkl'
我有点理解为什么它不起作用,但我找不到正确的方法。
open()
是不理解 Google 云存储路径的标准 Python 库函数。您需要改用 Beam FileSystems
API,它知道它以及 Beam 支持的其他文件系统。
如果您的 GCS 存储桶中有 pickle 文件,那么您可以将它们作为 BLOB 加载 并像在您的代码中一样进一步处理它们(使用 pickle.load()
):
class ReadGcsBlobs(beam.DoFn):
def process(self, element, *args, **kwargs):
from apache_beam.io.gcp import gcsio
gcs = gcsio.GcsIO()
yield (element, gcs.open(element).read())
# usage example:
files = (p
| "Initialize" >> beam.Create(["gs://your-bucket-name/pickle_file_path.pickle"])
| "Read blobs" >> beam.ParDo(ReadGcsBlobs())
)
我尝试 运行 远程使用 pickle 文件的 DataFlow 管道。 在本地,我可以使用下面的代码来调用文件。
with open (known_args.file_path, 'rb') as fp:
file = pickle.load(fp)
但是,当路径是关于云存储(gs://...) 时,我发现它无效:
IOError: [Errno 2] No such file or directory: 'gs://.../.pkl'
我有点理解为什么它不起作用,但我找不到正确的方法。
open()
是不理解 Google 云存储路径的标准 Python 库函数。您需要改用 Beam FileSystems
API,它知道它以及 Beam 支持的其他文件系统。
如果您的 GCS 存储桶中有 pickle 文件,那么您可以将它们作为 BLOB 加载 并像在您的代码中一样进一步处理它们(使用 pickle.load()
):
class ReadGcsBlobs(beam.DoFn):
def process(self, element, *args, **kwargs):
from apache_beam.io.gcp import gcsio
gcs = gcsio.GcsIO()
yield (element, gcs.open(element).read())
# usage example:
files = (p
| "Initialize" >> beam.Create(["gs://your-bucket-name/pickle_file_path.pickle"])
| "Read blobs" >> beam.ParDo(ReadGcsBlobs())
)