如何使用 apache beam 将列表对象写入 JSON 文件?

How to write list object into a JSON file using apache beam?

我有一个字典元素列表,如下所示。

list_data = [
    {"id":"1", "name":"Cow", "type": "animal"},
    {"id":"2", "name":"Lion", "type": "animal"},
    {"id":"3", "name":"Peacock", "type": "bird"},
    {"id":"4", "name":"Giraffe", "type": "animal"}
]

我希望使用 Apache Beam 管道将上述列表写入 JSON 文件。

我试过这样做:

class BeamProcess:

    def process_data():

        json_file_path = "gs://my_bucket/df_output/output.json"
        
        list_data = [
            {"id":"1", "name":"Cow", "type": "animal"},
            {"id":"2", "name":"Lion", "type": "animal"},
            {"id":"3", "name":"Peacock", "type": "bird"},
            {"id":"4", "name":"Giraffe", "type": "animal"}
        ]

        argv = [
                '--project=<my_project>',
                '--region=<region>',
                '--job_name=<custom_name>',
                '--temp_location=<temporary_location>',
                '--runner=DataflowRunner'
            ]

        p = beam.Pipeline(argv=argv)

        (
                p
                | 'Create' >> beam.Create(list_data)
                | 'Write Output' >> beam.io.WriteToText(json_file_path, shard_name_template='')
        )
        p.run().wait_until_finish()


if __name__ == "__main__":
    beam_proc = BeamProcess()
    beam_proc.process_data()

当我执行上面的代码时,我最终在 output.json 文件中看到以下行。

{"id":"1", "name":"Cow", "type": "animal"}
{"id":"2", "name":"Lion", "type": "animal"}
{"id":"3", "name":"Peacock", "type": "bird"}
{"id":"4", "name":"Giraffe", "type": "animal"}

但我希望看到的是:

[
    {"id":"1", "name":"Cow", "type": "animal"},
    {"id":"2", "name":"Lion", "type": "animal"},
    {"id":"3", "name":"Peacock", "type": "bird"},
    {"id":"4", "name":"Giraffe", "type": "animal"}
]

使用 apache beam 将 python 列表对象写入 JSON 文件的正确方法是什么?

当给 beam.Create 一个列表时,它会将其解释为生成的 PCollection 的元素列表。当您将 PCollection 写成文本时,您输出的是四个单独的元素而不是一个列表,这就是为什么它的格式不符合您的预期。

beam.Create([1, 2, 3, 4]) # Creates a PCollection of four int elements.

因此,为了创建一个包含列表作为元素的 PCollection,您需要嵌套要用作元素的列表,如下所示:

beam.Create([[1, 2, 3, 4]]) # Creates a PCollection of one list element.