在 GCS 存储桶和 Dataflow VM 之间读写文件
Reading and writing file between GCS bucket and Dataflow VM
我正在尝试从 GCS 存储桶(路径:gs://bucket_name)读取文件,并将其加载到 Dataflow VM 文件夹(路径为 /tmp/file 名称)。
我还需要将另一个文件从 Dataflow VM 文件夹复制回 GCS 存储桶。
我试过 apache_beam.io.gcp.gcsio 库,但它似乎不起作用。
任何人都可以对此提出任何建议吗?
最好的方法是使用 process
方法调用 GCS Python 客户端库文档的 GCS Python API. The DoFn can be triggered by sending in elements to the DoFn
. It can be triggered either by an Impulse (only execute once) or a PCollection (execute per element in the PCollection). Take a look here for downloading/uploading blobs and here 来触发自定义 DoFn
。
import apache_beam as beam
from google.cloud import storage
p = beam.Pipeline(...)
impulse = p | beam.Impulse()
class ReadWriteToGcs(beam.DoFn):
def setup(self, e):
self.client = storage.Client()
def process(self, e):
bucket = self.client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
blob.download_to_filename(destination_file_name)
blob.upload_from_filename(source_file_name)
我正在尝试从 GCS 存储桶(路径:gs://bucket_name)读取文件,并将其加载到 Dataflow VM 文件夹(路径为 /tmp/file 名称)。
我还需要将另一个文件从 Dataflow VM 文件夹复制回 GCS 存储桶。
我试过 apache_beam.io.gcp.gcsio 库,但它似乎不起作用。
任何人都可以对此提出任何建议吗?
最好的方法是使用 process
方法调用 GCS Python 客户端库文档的 GCS Python API. The DoFn can be triggered by sending in elements to the DoFn
. It can be triggered either by an Impulse (only execute once) or a PCollection (execute per element in the PCollection). Take a look here for downloading/uploading blobs and here 来触发自定义 DoFn
。
import apache_beam as beam
from google.cloud import storage
p = beam.Pipeline(...)
impulse = p | beam.Impulse()
class ReadWriteToGcs(beam.DoFn):
def setup(self, e):
self.client = storage.Client()
def process(self, e):
bucket = self.client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
blob.download_to_filename(destination_file_name)
blob.upload_from_filename(source_file_name)