如何从 GCP 存储桶读取 Apache Beam 中的多个文件
How to read multiple files in Apache Beam from GCP bucket
我正在尝试使用 Apache Beam 读取 GCP 中的多个文件并对其应用一些子集。我准备了两个管道,它们只适用于一个文件,但是当我在多个文件上尝试时失败了。除此之外,如果可能的话,我会很方便地将我的管道合并为一个,或者有没有办法对它们进行编排,以便它们按顺序工作。现在管道在本地工作,但我的最终目标是 运行 使用 Dataflow。
我 textio.ReadFromText 和 textio.ReadAllFromText,但我无法在多个文件的情况下使两者都起作用。
def toJson(file):
with open(file) as f:
return json.load(f)
with beam.Pipeline(options=PipelineOptions()) as p:
files = (p
| beam.io.textio.ReadFromText("gs://my_bucket/file1.txt.gz", skip_header_lines = 0)
| beam.io.WriteToText("/home/test",
file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| 'read_data' >> beam.Create(['test-00000-of-00001.json'])
| "toJson" >> beam.Map(toJson)
| "takeItems" >> beam.FlatMap(lambda line: line["Items"])
| "takeSubjects" >> beam.FlatMap(lambda line: line['data']['subjects'])
| beam.combiners.Count.PerElement()
| beam.io.WriteToText("/home/items",
file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))
这两个流水线对于单个文件运行良好,但我有上百个相同格式的文件,想利用并行计算的优势。
有没有办法让这个管道对同一目录下的多个文件起作用?
是否可以在单个管道中执行此操作而不是创建两个不同的管道? (从桶中向工作节点写入文件并不方便。)
我解决了如何使其适用于多个文件,但无法在单个管道中实现 运行。我使用 for 循环,然后使用 beam.Flatten 选项。
这是我的解决方案:
file_list = ["gs://my_bucket/file*.txt.gz"]
res_list = ["/home/subject_test_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]
with beam.Pipeline(options=PipelineOptions()) as p:
for i,file in enumerate(file_list):
(p
| "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file, skip_header_lines = 0)
| "Write TExt {}".format(i) >> beam.io.WriteToText("/home/subject_test_{}".format(i),
file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))
pcols = []
with beam.Pipeline(options=PipelineOptions()) as p:
for i,res in enumerate(res_list):
pcol = (p | 'read_data_{}'.format(i) >> beam.Create([res])
| "toJson_{}".format(i) >> beam.Map(toJson)
| "takeItems_{}".format(i) >> beam.FlatMap(lambda line: line["Items"])
| "takeSubjects_{}".format(i) >> beam.FlatMap(lambda line: line['data']['subjects']))
pcols.append(pcol)
out = (pcols
| beam.Flatten()
| beam.combiners.Count.PerElement()
| beam.io.WriteToText("/home/items",
file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))
我正在尝试使用 Apache Beam 读取 GCP 中的多个文件并对其应用一些子集。我准备了两个管道,它们只适用于一个文件,但是当我在多个文件上尝试时失败了。除此之外,如果可能的话,我会很方便地将我的管道合并为一个,或者有没有办法对它们进行编排,以便它们按顺序工作。现在管道在本地工作,但我的最终目标是 运行 使用 Dataflow。
我 textio.ReadFromText 和 textio.ReadAllFromText,但我无法在多个文件的情况下使两者都起作用。
def toJson(file):
with open(file) as f:
return json.load(f)
with beam.Pipeline(options=PipelineOptions()) as p:
files = (p
| beam.io.textio.ReadFromText("gs://my_bucket/file1.txt.gz", skip_header_lines = 0)
| beam.io.WriteToText("/home/test",
file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| 'read_data' >> beam.Create(['test-00000-of-00001.json'])
| "toJson" >> beam.Map(toJson)
| "takeItems" >> beam.FlatMap(lambda line: line["Items"])
| "takeSubjects" >> beam.FlatMap(lambda line: line['data']['subjects'])
| beam.combiners.Count.PerElement()
| beam.io.WriteToText("/home/items",
file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))
这两个流水线对于单个文件运行良好,但我有上百个相同格式的文件,想利用并行计算的优势。
有没有办法让这个管道对同一目录下的多个文件起作用?
是否可以在单个管道中执行此操作而不是创建两个不同的管道? (从桶中向工作节点写入文件并不方便。)
我解决了如何使其适用于多个文件,但无法在单个管道中实现 运行。我使用 for 循环,然后使用 beam.Flatten 选项。
这是我的解决方案:
file_list = ["gs://my_bucket/file*.txt.gz"]
res_list = ["/home/subject_test_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]
with beam.Pipeline(options=PipelineOptions()) as p:
for i,file in enumerate(file_list):
(p
| "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file, skip_header_lines = 0)
| "Write TExt {}".format(i) >> beam.io.WriteToText("/home/subject_test_{}".format(i),
file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))
pcols = []
with beam.Pipeline(options=PipelineOptions()) as p:
for i,res in enumerate(res_list):
pcol = (p | 'read_data_{}'.format(i) >> beam.Create([res])
| "toJson_{}".format(i) >> beam.Map(toJson)
| "takeItems_{}".format(i) >> beam.FlatMap(lambda line: line["Items"])
| "takeSubjects_{}".format(i) >> beam.FlatMap(lambda line: line['data']['subjects']))
pcols.append(pcol)
out = (pcols
| beam.Flatten()
| beam.combiners.Count.PerElement()
| beam.io.WriteToText("/home/items",
file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))