如何处理代码存储库中的大文件?

How can I process large files in Code Repositories?

我有一个数据源,每天提供一个大的 .txt 文件 (50-75GB)。该文件中包含多个不同的模式,其中每一行对应一个模式。我想将其拆分为每个模式的分区数据集,我怎样才能有效地做到这一点?

您需要解决的最大问题是恢复架构的迭代速度,这对于这种规模的文件来说可能具有挑战性。

你最好的策略是获取一个示例 'notional' 文件,其中包含你想要恢复的每个模式作为其中的一行,并将其作为一个文件添加到你的存储库中。当您将此文件添加到您的存储库中时(连同您的转换逻辑),您将能够将其推送到数据框中,就像处理数据集中的原始文件一样,以进行快速测试迭代。

首先,确保将 txt 文件指定为包内容的一部分,这样您的测试就会发现它们(这在 Read a file from a Python repository 下的文档中有所介绍):

You can read other files from your repository into the transform context. This might be useful in setting parameters for your transform code to reference.

To start, In your python repository edit setup.py:

setup(
   name=os.environ['PKG_NAME'],
# ...
    package_data={
        '': ['*.txt']
    }
)

我正在使用包含以下内容的 txt 文件:

my_column, my_other_column
some_string,some_other_string
some_thing,some_other_thing,some_final_thing

此文本文件位于我存储库中的以下路径中:transforms-python/src/myproject/datasets/raw.txt

一旦您将文本文件配置为与您的逻辑一起发送,并且将文件本身包含在您的存储库中之后,您就可以包含以下代码。这段代码有几个重要的功能:

  1. 它将原始文件解析逻辑与将文件读入 Spark DataFrame 的阶段完全分开。这样 方式 这个 DataFrame 的构建可以留给测试基础设施,或者留给 运行 时间,这取决于你在哪里 运行ning。
  2. 这种保持逻辑分离的方式让您可以确保您想要进行的实际逐行解析是它自己的可测试函数,而不是让它纯粹存在于您的 my_compute_function
  3. 此代码使用 Spark-native spark_session.read.text 方法,这将比原始 txt 文件的逐行解析快几个数量级。这将确保并行化的 DataFrame 是您操作的对象,而不是您的执行程序(或更糟的是您的驱动程序)中逐行操作的单个文件。
from transforms.api import transform, Input, Output
from pkg_resources import resource_filename


def raw_parsing_logic(raw_df):
    return raw_df


@transform(
    my_output=Output("/txt_tests/parsed_files"),
    my_input=Input("/txt_tests/dataset_of_files"),
)
def my_compute_function(my_input, my_output, ctx):
    all_files_df = None
    for file_status in my_input.filesystem().ls('**/**'):
        raw_df = ctx.spark_session.read.text(my_input.filesystem().hadoop_path + "/" + file_status.path)
        parsed_df = raw_parsing_logic(raw_df)
        all_files_df = parsed_df if all_files_df is None else all_files_df.unionByName(parsed_df)
    my_output.write_dataframe(all_files_df)


def test_my_compute_function(spark_session):
    file_path = resource_filename(__name__, "raw.txt")
    raw_df = raw_parsing_logic(
      spark_session.read.text(file_path)
    )
    assert raw_df.count() > 0
    raw_columns_set = set(raw_df.columns)
    expected_columns_set = {"value"}
    assert len(raw_columns_set.intersection(expected_columns_set)) == 1

一旦您准备好此代码并 运行ning,您的 test_my_compute_function 方法将非常 快速迭代,这样您就可以完善您的模式恢复逻辑。这将使您在最后构建数据集变得更加容易,而且没有任何完整构建的开销。