如何在 Foundry 中解析大型压缩 csv 文件?
How do I parse large compressed csv files in Foundry?
我有一个大的 gzip 压缩 csv 文件 (.csv.gz) 上传到一个数据集,该文件大小约为 14GB,未压缩时为 40GB。有没有一种方法可以使用 Python 转换将其解压缩、读取和写入数据集,而不会导致执行程序 OOM?
我将协调一些策略来回答这个问题。
首先,我想使用讨论的方法 使用测试驱动开发来编写这篇文章,因为我们正在处理原始文件。使用完整检查 + 构建的原始文件的迭代速度会太长,所以我将首先创建一个示例 .csv
文件并压缩它以加快开发速度。
我的示例 .csv
文件如下所示:
然后我使用命令行实用程序压缩它,并通过将存储库克隆到我的本地计算机、将文件添加到我的开发分支并将结果推送回我的 Foundry 实例来将其添加到我的代码存储库。
我还在我的存储库中创建了一个 test
目录,因为我想确保我的解析逻辑得到正确验证。
这导致我的存储库如下所示:
提示:不要忘记修改您的 setup.py
和 build.gradle
文件以启用测试并专门打包您的小测试文件。
我还需要让我的解析逻辑位于我的 my_compute_function
方法之外,以便它可供我的测试方法使用,因此 parse_gzip.py
如下所示:
from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many
def read_files(spark_session, paths):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.option("header", "true").csv(file_name)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs)
return output_df
@transform(
the_output=Output("ri.foundry.main.dataset.my-awesome-output"),
the_input=Input("ri.foundry.main.dataset.my-awesome-input"),
)
def my_compute_function(the_input, the_output, ctx):
session = ctx.spark_session
input_filesystem = the_input.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = [hadoop_path + file_status.path for file_status in input_filesystem.ls('**/*.csv.gz')]
output_df = read_files(session, files)
the_output.write_dataframe(output_df)
因此,我的 test_gzip_csv.py
文件如下所示:
from myproject.datasets import parse_gzip
from pkg_resources import resource_filename
def test_compressed_csv(spark_session):
file_path = resource_filename(__name__, "test.csv.gz")
parsed_df = parse_gzip.read_files(spark_session, [file_path])
assert parsed_df.count() == 1
assert set(parsed_df.columns) == {"col_1", "col_2"}
重要的是要在此处看到此方法不使用 .files()
调用文件系统,它使用 .ls()
方法获取文件名的迭代器。在这种情况下,这是故意完成的,因为我们不需要在执行程序中解析文件本身;我们只需要使用 Spark 的原生方法 .csv
来使用现有功能解析压缩文件。
GZip 文件实际上是可拆分的,Spark 自己读取这些文件的方法将是最佳的,而不是编写您自己的解压缩器/.csv 解析器。如果您尝试解压缩它们并手动解析它们,您将面临 OOMing 作业的风险,并且需要投入更多内存才能成功。在您同时运营的规模下,建议不要在 Python 中处理单个文件,因为它的性能将无法与 Spark 相媲美。
请注意,我在这里也使用 transforms.verbs.dataframes.union_many
方法来优雅地处理具有不同架构的不同文件。您可以指定 'narrow'、'wide' 和 'strict' 选项来处理不同模式的情况,请参阅最适合您需求的产品文档。
有 2 种推荐的方法可以将包含原始 CSV 文件的数据集解析为 Foundry 中的表格数据集。
1.只需添加一个架构!
Foundry 能够原生理解包含(压缩的)csv 文件的数据集。所以在某些情况下,首先不需要解析作业。
我们只需要告诉 Foundry 我们打算将此数据集解释为表格数据,而不是通用的文件桶。
简单导航到数据集预览页面,然后单击右上角的“应用模式”按钮。
Foundry 将对架构做出最好的猜测,但如果没有您的帮助,它并不总是能很好地完成工作,因此您可能会得到名为 untitled_column_1
的列,或者使用错误的数据类型等。这些可以通过单击应用程序左侧信息窗格中的“编辑架构”并使用架构编辑器对话框手动清理。
注意:这不是用于生产管道的可靠解决方案。如果新的 CSV 被添加到不符合模式的数据集,Foundry 将不知道,下游作业将无法读取数据集。
2。在变换中
我们可以使用 spark 来推断 Foundry 转换作业中 csv 文件的模式。
有关详细信息,请参阅 official platform docs on inferring a schema。
同时复制代码片段在这里供后代使用:
from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet
@transform(
output=Output("/Company/sourceA/parsed/data"),
raw=Input("/Company/sourceA/raw/data_csv"),
)
def read_csv(ctx, raw, output):
filesystem = raw.filesystem()
hadoop_path = filesystem.hadoop_path
files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
df = (
ctx
.spark_session
.read
.option("encoding", "UTF-8") # UTF-8 is the default
.option("header", True)
.option("inferSchema", True)
.csv(files)
)
output.write_dataframe(sanitize_schema_for_parquet(df))
请注意,这比当前接受的 SO 答案更有效,因为它将所有文件路径一次传递到 spark.read.csv()
,而不是一次一个,然后执行合并。
注意:这是一个更强大的生产管道解决方案,因为如果 csv 数据发生变化,架构将自动更新。但如果模式意外更改,下游数据集仍可能失败。如果数据包含具有已知模式的文件,我建议关闭 spark.read
的 inferSchema
选项并显式传递模式,或者使用 schema expectation 使构建失败,或者通知如果架构意外更改,您将收到通知。
放在一边。关于压缩编解码器的注释
首先,一个警告:与当前接受的答案中提到的相反,.csv.gz
文件 不可拆分 。 Spark 将只能在一个执行节点上顺序处理您的 40GB 数据。
如果您有非常大的单个文件,例如这个文件,我建议您使用可拆分压缩编解码器,例如 bzip2 或 lz4 .这样 spark 自然可以并行处理大文件。
注意:这与使用 gzip 压缩的 parquet 文件不同,后者 是 可拆分的。这是因为无论使用何种压缩编解码器,parquet 格式本身都被设计为可拆分。
我有一个大的 gzip 压缩 csv 文件 (.csv.gz) 上传到一个数据集,该文件大小约为 14GB,未压缩时为 40GB。有没有一种方法可以使用 Python 转换将其解压缩、读取和写入数据集,而不会导致执行程序 OOM?
我将协调一些策略来回答这个问题。
首先,我想使用讨论的方法 .csv
文件并压缩它以加快开发速度。
我的示例 .csv
文件如下所示:
然后我使用命令行实用程序压缩它,并通过将存储库克隆到我的本地计算机、将文件添加到我的开发分支并将结果推送回我的 Foundry 实例来将其添加到我的代码存储库。
我还在我的存储库中创建了一个 test
目录,因为我想确保我的解析逻辑得到正确验证。
这导致我的存储库如下所示:
提示:不要忘记修改您的 setup.py
和 build.gradle
文件以启用测试并专门打包您的小测试文件。
我还需要让我的解析逻辑位于我的 my_compute_function
方法之外,以便它可供我的测试方法使用,因此 parse_gzip.py
如下所示:
from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many
def read_files(spark_session, paths):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.option("header", "true").csv(file_name)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs)
return output_df
@transform(
the_output=Output("ri.foundry.main.dataset.my-awesome-output"),
the_input=Input("ri.foundry.main.dataset.my-awesome-input"),
)
def my_compute_function(the_input, the_output, ctx):
session = ctx.spark_session
input_filesystem = the_input.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = [hadoop_path + file_status.path for file_status in input_filesystem.ls('**/*.csv.gz')]
output_df = read_files(session, files)
the_output.write_dataframe(output_df)
因此,我的 test_gzip_csv.py
文件如下所示:
from myproject.datasets import parse_gzip
from pkg_resources import resource_filename
def test_compressed_csv(spark_session):
file_path = resource_filename(__name__, "test.csv.gz")
parsed_df = parse_gzip.read_files(spark_session, [file_path])
assert parsed_df.count() == 1
assert set(parsed_df.columns) == {"col_1", "col_2"}
重要的是要在此处看到此方法不使用 .files()
调用文件系统,它使用 .ls()
方法获取文件名的迭代器。在这种情况下,这是故意完成的,因为我们不需要在执行程序中解析文件本身;我们只需要使用 Spark 的原生方法 .csv
来使用现有功能解析压缩文件。
GZip 文件实际上是可拆分的,Spark 自己读取这些文件的方法将是最佳的,而不是编写您自己的解压缩器/.csv 解析器。如果您尝试解压缩它们并手动解析它们,您将面临 OOMing 作业的风险,并且需要投入更多内存才能成功。在您同时运营的规模下,建议不要在 Python 中处理单个文件,因为它的性能将无法与 Spark 相媲美。
请注意,我在这里也使用 transforms.verbs.dataframes.union_many
方法来优雅地处理具有不同架构的不同文件。您可以指定 'narrow'、'wide' 和 'strict' 选项来处理不同模式的情况,请参阅最适合您需求的产品文档。
有 2 种推荐的方法可以将包含原始 CSV 文件的数据集解析为 Foundry 中的表格数据集。
1.只需添加一个架构!
Foundry 能够原生理解包含(压缩的)csv 文件的数据集。所以在某些情况下,首先不需要解析作业。
我们只需要告诉 Foundry 我们打算将此数据集解释为表格数据,而不是通用的文件桶。
简单导航到数据集预览页面,然后单击右上角的“应用模式”按钮。
Foundry 将对架构做出最好的猜测,但如果没有您的帮助,它并不总是能很好地完成工作,因此您可能会得到名为 untitled_column_1
的列,或者使用错误的数据类型等。这些可以通过单击应用程序左侧信息窗格中的“编辑架构”并使用架构编辑器对话框手动清理。
注意:这不是用于生产管道的可靠解决方案。如果新的 CSV 被添加到不符合模式的数据集,Foundry 将不知道,下游作业将无法读取数据集。
2。在变换中
我们可以使用 spark 来推断 Foundry 转换作业中 csv 文件的模式。
有关详细信息,请参阅 official platform docs on inferring a schema。
同时复制代码片段在这里供后代使用:
from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet
@transform(
output=Output("/Company/sourceA/parsed/data"),
raw=Input("/Company/sourceA/raw/data_csv"),
)
def read_csv(ctx, raw, output):
filesystem = raw.filesystem()
hadoop_path = filesystem.hadoop_path
files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
df = (
ctx
.spark_session
.read
.option("encoding", "UTF-8") # UTF-8 is the default
.option("header", True)
.option("inferSchema", True)
.csv(files)
)
output.write_dataframe(sanitize_schema_for_parquet(df))
请注意,这比当前接受的 SO 答案更有效,因为它将所有文件路径一次传递到 spark.read.csv()
,而不是一次一个,然后执行合并。
注意:这是一个更强大的生产管道解决方案,因为如果 csv 数据发生变化,架构将自动更新。但如果模式意外更改,下游数据集仍可能失败。如果数据包含具有已知模式的文件,我建议关闭 spark.read
的 inferSchema
选项并显式传递模式,或者使用 schema expectation 使构建失败,或者通知如果架构意外更改,您将收到通知。
放在一边。关于压缩编解码器的注释
首先,一个警告:与当前接受的答案中提到的相反,.csv.gz
文件 不可拆分 。 Spark 将只能在一个执行节点上顺序处理您的 40GB 数据。
如果您有非常大的单个文件,例如这个文件,我建议您使用可拆分压缩编解码器,例如 bzip2 或 lz4 .这样 spark 自然可以并行处理大文件。
注意:这与使用 gzip 压缩的 parquet 文件不同,后者 是 可拆分的。这是因为无论使用何种压缩编解码器,parquet 格式本身都被设计为可拆分。