如何在 apache beam 中使用 ParDo 和 DoFn 写入 GCS

how to write to GCS with a ParDo and a DoFn in apache beam

使用 apache_beam.io.filesystems.FileSystems 如何使用 ParDo 和 DoFn 写入 GCS ??我已经从 pardo 获得 csv 格式的输出,我是否需要编写另一个 pardo 将其写入 gcs,或者我可以直接导入模块以将其直接写入 gcs 吗?请帮忙

我有一个示例 here,其中我使用 apache_beam.io.filesystems.FileSystems 将 b64 编码的图像写入 GCS。管道的最后一步将 b64 作为包含两个字段 key_idimage 的 PCollection 并应用 ParDo:

b64 | 'Save images' >> beam.ParDo(WriteToSeparateFiles(known_args.output))

其中known_args.output是GCS基本路径,WriteToSeparateFiles如下:

class WriteToSeparateFiles(beam.DoFn):
    def __init__(self, outdir):
        self.outdir = outdir
    def process(self, element):
        writer = filesystems.FileSystems.create(self.outdir + element['key_id'] + '.png')
        writer.write(element['image'])
        writer.close()

使用 filesystems.FileSystems.create() 我可以控制目标路径。对于基本路径,我使用传递给函数的参数,并使用每个元素的 key_id 来生成有意义的文件名。最后,我在写图片时附加了 .png 扩展名。

我使用writer.write(element['image'])为每个文件保存image字段的内容并使用writer.close()关闭流。