在 GCS 存储桶和 Dataflow VM 之间读写文件

Reading and writing file between GCS bucket and Dataflow VM

我正在尝试从 GCS 存储桶(路径:gs://bucket_name)读取文件,并将其加载到 Dataflow VM 文件夹(路径为 /tmp/file 名称)。

我还需要将另一个文件从 Dataflow VM 文件夹复制回 GCS 存储桶。

我试过 apache_beam.io.gcp.gcsio 库,但它似乎不起作用。

任何人都可以对此提出任何建议吗?

最好的方法是使用 process 方法调用 GCS Python 客户端库文档的 GCS Python API. The DoFn can be triggered by sending in elements to the DoFn. It can be triggered either by an Impulse (only execute once) or a PCollection (execute per element in the PCollection). Take a look here for downloading/uploading blobs and here 来触发自定义 DoFn

import apache_beam as beam
from google.cloud import storage

p = beam.Pipeline(...)
impulse = p | beam.Impulse()

class ReadWriteToGcs(beam.DoFn):
  def setup(self, e):
    self.client = storage.Client()

  def process(self, e):
    bucket = self.client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)
    blob.upload_from_filename(source_file_name)