如何在 Foundry 中解析大型压缩 csv 文件?

How do I parse large compressed csv files in Foundry?

我有一个大的 gzip 压缩 csv 文件 (.csv.gz) 上传到一个数据集,该文件大小约为 14GB,未压缩时为 40GB。有没有一种方法可以使用 Python 转换将其解压缩、读取和写入数据集,而不会导致执行程序 OOM?

我将协调一些策略来回答这个问题。

首先,我想使用讨论的方法 使用测试驱动开发来编写这篇文章,因为我们正在处理原始文件。使用完整检查 + 构建的原始文件的迭代速度会太长,所以我将首先创建一个示例 .csv 文件并压缩它以加快开发速度。

我的示例 .csv 文件如下所示:

然后我使用命令行实用程序压缩它,并通过将存储库克隆到我的本地计算机、将文件添加到我的开发分支并将结果推送回我的 Foundry 实例来将其添加到我的代码存储库。

我还在我的存储库中创建了一个 test 目录,因为我想确保我的解析逻辑得到正确验证。

这导致我的存储库如下所示:

提示:不要忘记修改您的 setup.pybuild.gradle 文件以启用测试并专门打包您的小测试文件。

我还需要让我的解析逻辑位于我的 my_compute_function 方法之外,以便它可供我的测试方法使用,因此 parse_gzip.py 如下所示:

from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many


def read_files(spark_session, paths):
    parsed_dfs = []
    for file_name in paths:
        parsed_df = spark_session.read.option("header", "true").csv(file_name)
        parsed_dfs += [parsed_df]
    output_df = union_many(*parsed_dfs)
    return output_df


@transform(
    the_output=Output("ri.foundry.main.dataset.my-awesome-output"),
    the_input=Input("ri.foundry.main.dataset.my-awesome-input"),
)
def my_compute_function(the_input, the_output, ctx):
    session = ctx.spark_session
    input_filesystem = the_input.filesystem()
    hadoop_path = input_filesystem.hadoop_path
    files = [hadoop_path + file_status.path for file_status in input_filesystem.ls('**/*.csv.gz')]
    output_df = read_files(session, files)
    the_output.write_dataframe(output_df)

因此,我的 test_gzip_csv.py 文件如下所示:

from myproject.datasets import parse_gzip
from pkg_resources import resource_filename


def test_compressed_csv(spark_session):
    file_path = resource_filename(__name__, "test.csv.gz")
    parsed_df = parse_gzip.read_files(spark_session, [file_path])
    assert parsed_df.count() == 1
    assert set(parsed_df.columns) == {"col_1", "col_2"}

重要的是要在此处看到此方法不使用 .files() 调用文件系统,它使用 .ls() 方法获取文件名的迭代器。在这种情况下,这是故意完成的,因为我们不需要在执行程序中解析文件本身;我们只需要使用 Spark 的原生方法 .csv 来使用现有功能解析压缩文件。

GZip 文件实际上是可拆分的,Spark 自己读取这些文件的方法将是最佳的,而不是编写您自己的解压缩器/.csv 解析器。如果您尝试解压缩它们并手动解析它们,您将面临 OOMing 作业的风险,并且需要投入更多内存才能成功。在您同时运营的规模下,建议不要在 Python 中处理单个文件,因为它的性能将无法与 Spark 相媲美。

请注意,我在这里也使用 transforms.verbs.dataframes.union_many 方法来优雅地处理具有不同架构的不同文件。您可以指定 'narrow'、'wide' 和 'strict' 选项来处理不同模式的情况,请参阅最适合您需求的产品文档。

有 2 种推荐的方法可以将包含原始 CSV 文件的数据集解析为 Foundry 中的表格数据集。

1.只需添加一个架构!

Foundry 能够原生理解包含(压缩的)csv 文件的数据集。所以在某些情况下,首先不需要解析作业。

我们只需要告诉 Foundry 我们打算将此数据集解释为表格数据,而不是通用的文件桶。

简单导航到数据集预览页面,然后单击右上角的“应用模式”按钮。

Foundry 将对架构做出最好的猜测,但如果没有您的帮助,它并不总是能很好地完成工作,因此您可能会得到名为 untitled_column_1 的列,或者使用错误的数据类型等。这些可以通过单击应用程序左侧信息窗格中的“编辑架构”并使用架构编辑器对话框手动清理。

注意:这不是用于生产管道的可靠解决方案。如果新的 CSV 被添加到不符合模式的数据集,Foundry 将不知道,下游作业将无法读取数据集。

2。在变换中

我们可以使用 spark 来推断 Foundry 转换作业中 csv 文件的模式。

有关详细信息,请参阅 official platform docs on inferring a schema

同时复制代码片段在这里供后代使用:

from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet

@transform(
    output=Output("/Company/sourceA/parsed/data"),
    raw=Input("/Company/sourceA/raw/data_csv"),
)
def read_csv(ctx, raw, output):
    filesystem = raw.filesystem()
    hadoop_path = filesystem.hadoop_path
    files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
    df = (
        ctx
        .spark_session
        .read
        .option("encoding", "UTF-8")  # UTF-8 is the default
        .option("header", True)
        .option("inferSchema", True)
        .csv(files)
    )
    output.write_dataframe(sanitize_schema_for_parquet(df))

请注意,这比当前接受的 SO 答案更有效,因为它将所有文件路径一次传递到 spark.read.csv(),而不是一次一个,然后执行合并。

注意:这是一个更强大的生产管道解决方案,因为如果 csv 数据发生变化,架构将自动更新。但如果模式意外更改,下游数据集仍可能失败。如果数据包含具有已知模式的文件,我建议关闭 spark.readinferSchema 选项并显式传递模式,或者使用 schema expectation 使构建失败,或者通知如果架构意外更改,您将收到通知。

放在一边。关于压缩编解码器的注释

首先,一个警告:与当前接受的答案中提到的相反,.csv.gz 文件 不可拆分 。 Spark 将只能在一个执行节点上顺序处理您的 40GB 数据。

如果您有非常大的单个文件,例如这个文件,我建议您使用可拆分压缩编解码器,例如 bzip2lz4 .这样 spark 自然可以并行处理大文件。

注意:这与使用 gzip 压缩的 parquet 文件不同,后者 可拆分的。这是因为无论使用何种压缩编解码器,parquet 格式本身都被设计为可拆分。