我可以分块处理 apache beam 中的 pcollections 吗?我可以分批处理 pcollection 并分别处理每批吗?
Can I process pcollections in apache beam in chunks? Can I make batches of pcollection and process each batch separately?
我在 GCS 上有大约 2000 个文件,我想处理所有文件并将它们上传到 BigQuery。当我使用管道进行处理时,这些管道无法自行完成。它在处理 70% 的文件后失败或停止工作。
我决定每批制作 100 个文件。我使用 BatchElements()
这会在列表中创建一个列表,即二维列表。
这些批次是否要单独执行。意味着第一批的 pcollections 将在第二批开始工作后自行执行(从 GCS 打开文件并将其上传到 BigQuery)。
这是串行工作还是并行工作?
如果任何批次失败,我的工作会失败吗?
xml = (
p1
| "Get xml name" >> beam.Create(gz_names)
| "Batch elements" >> BatchElements(100)
| "Open file" >> beam.ParDo(ReadGCS())
| "Create XML" >> beam.ParDo(CreateXML())
| "Parse Json" >> beam.ParDo(JsonParser())
| "Remove elements" >> beam.ParDo(Filtering())
| "Build Json" >> beam.ParDo(JsonBuilder())
| "Write elements" >> beam.io.WriteToText(file_path_prefix="output12", file_name_suffix=".json")
)
p1.run()
Beam 和其他大数据系统尝试执行优化以改进管道的执行。最简单的优化叫做'fusion',也就是
在这种情况下,问题是您的 Create
和 BatchElements
以及 ReadGCS
转换都在同一个工作程序中执行,并且没有机会分配给多个工人。
我的建议是您尝试重新整理您的数据。这将使它能够被重新分配。像这样:
xml = (
p1
| "Get xml name" >> beam.Create(gz_names)
| beam.Reshuffle() # This will redistribute your data to other workers
| "Open file" >> beam.ParDo(ReadGCS())
| "Create XML" >> beam.ParDo(CreateXML())
| "Parse Json" >> beam.ParDo(JsonParser())
| "Remove elements" >> beam.ParDo(Filtering())
| "Build Json" >> beam.ParDo(JsonBuilder())
| "Write elements" >> beam.io.WriteToText(file_path_prefix="output12", file_name_suffix=".json")
)
p1.run()
完成后,您的管道应该能够取得更好的进展。
另一个可以使您的管道性能更高的提示:考虑使用 transforms under apache_beam.io.fileio
。它们包括匹配文件名和读取文件的转换,它们是 Beam 中的标准。
如果您想深入了解一些运行程序优化,请查看 FlumeJava paper : )
我在 GCS 上有大约 2000 个文件,我想处理所有文件并将它们上传到 BigQuery。当我使用管道进行处理时,这些管道无法自行完成。它在处理 70% 的文件后失败或停止工作。
我决定每批制作 100 个文件。我使用 BatchElements()
这会在列表中创建一个列表,即二维列表。
这些批次是否要单独执行。意味着第一批的 pcollections 将在第二批开始工作后自行执行(从 GCS 打开文件并将其上传到 BigQuery)。
这是串行工作还是并行工作?
如果任何批次失败,我的工作会失败吗?
xml = (
p1
| "Get xml name" >> beam.Create(gz_names)
| "Batch elements" >> BatchElements(100)
| "Open file" >> beam.ParDo(ReadGCS())
| "Create XML" >> beam.ParDo(CreateXML())
| "Parse Json" >> beam.ParDo(JsonParser())
| "Remove elements" >> beam.ParDo(Filtering())
| "Build Json" >> beam.ParDo(JsonBuilder())
| "Write elements" >> beam.io.WriteToText(file_path_prefix="output12", file_name_suffix=".json")
)
p1.run()
Beam 和其他大数据系统尝试执行优化以改进管道的执行。最简单的优化叫做'fusion',也就是
在这种情况下,问题是您的 Create
和 BatchElements
以及 ReadGCS
转换都在同一个工作程序中执行,并且没有机会分配给多个工人。
我的建议是您尝试重新整理您的数据。这将使它能够被重新分配。像这样:
xml = (
p1
| "Get xml name" >> beam.Create(gz_names)
| beam.Reshuffle() # This will redistribute your data to other workers
| "Open file" >> beam.ParDo(ReadGCS())
| "Create XML" >> beam.ParDo(CreateXML())
| "Parse Json" >> beam.ParDo(JsonParser())
| "Remove elements" >> beam.ParDo(Filtering())
| "Build Json" >> beam.ParDo(JsonBuilder())
| "Write elements" >> beam.io.WriteToText(file_path_prefix="output12", file_name_suffix=".json")
)
p1.run()
完成后,您的管道应该能够取得更好的进展。
另一个可以使您的管道性能更高的提示:考虑使用 transforms under apache_beam.io.fileio
。它们包括匹配文件名和读取文件的转换,它们是 Beam 中的标准。
如果您想深入了解一些运行程序优化,请查看 FlumeJava paper : )