如何在 Palantir Foundry 中解析 xml 文档?

How do I parse xml documents in Palantir Foundry?

我有一组 .xml 个文档要解析。

我以前曾尝试使用获取文件内容并将它们转储到单个单元格中的方法来解析它们,但是我注意到这在实践中不起作用,因为我看到越来越慢 运行 次,通常一项任务需要花费数十小时 运行:

我的第一个转换采用 .xml 内容并将其放入单个单元格中,第二个转换采用此字符串并使用 Python 的 xml 库来解析字符串到文档中。然后,我可以从 return DataFrame 中提取属性。

我正在使用 UDF 执行将字符串内容映射到我想要的字段的过程。

我怎样才能使它更快/更好地处理大型 .xml 文件?

对于这个问题,我们将结合几种不同的技术来使这段代码既可测试又高度可扩展。

理论

解析原始文件时,您可以考虑几个选项:

  1. ❌ 您可以编写自己的解析器来从文件中读取字节并将其转换为 Spark 可以理解的数据。
    • 由于工程时间和不可扩展的体系结构,强烈建议不要这样做。当您执行此操作时,它不会利用分布式计算,因为您必须先将整个原始文件带到您的解析方法中,然后才能使用它。这不是对您资源的有效利用。
  2. ⚠ 你可以使用你自己的不是为Spark制作的解析器库,比如问题中提到的XML Python库
    • 虽然这比编写自己的解析器更容易实现,但它仍然没有利用 Spark 中的分布式计算。获得一些东西更容易 运行,但它最终会达到性能极限,因为它没有利用仅在编写 Spark 库时才公开的低级 Spark 功能。
  3. ✅ 您可以使用 Spark 原生原始文件解析器
    • 这是所有情况下的首选选项,因为它利用了低级 Spark 功能,并且不需要您编写自己的代码。如果存在低级 Spark 解析器,您应该使用它。

在我们的例子中,我们可以使用 Databricks 解析器来达到很好的效果。

一般来说,您还应该避免使用 .udf 方法,因为它可能被用来代替 Spark API 中已有的良好功能。 UDF 的性能不如本机方法,只有在没有其他选项可用时才应使用。

UDF 掩盖隐藏问题的一个很好的例子是列内容的字符串操作;虽然从技术上讲,您 可以 使用 UDF 来执行拆分和修剪字符串等操作,但这些操作已经存在于 Spark API 中,并且比您自己的代码快几个数量级。

设计

我们的设计将使用以下内容:

  1. 通过 Databricks XML Parser
  2. 完成的低级 Spark 优化文件解析
  3. 测试驱动的原始文件解析如说明

连接解析器

首先,我们需要将 .jar 添加到 Transforms 中可用的 spark_session。由于最近的改进,此参数在配置后将允许您在 Preview/Test 和完整构建时使用 .jar。以前,这需要完整构建,但现在不需要。

我们需要转到 transforms-python/build.gradle 文件并添加 2 个配置块:

  1. 启用 pytest 插件
  2. 启用 condaJars 参数并声明 .jar 依赖项

我的 /transforms-python/build.gradle 现在看起来像下面这样:

buildscript {
    repositories {
       // some other things
    }

    dependencies {
        classpath "com.palantir.transforms.python:lang-python-gradle-plugin:${transformsLangPythonPluginVersion}"
    }
}

apply plugin: 'com.palantir.transforms.lang.python'
apply plugin: 'com.palantir.transforms.lang.python-defaults'

dependencies {
    condaJars "com.databricks:spark-xml_2.13:0.14.0"
}

// Apply the testing plugin
apply plugin: 'com.palantir.transforms.lang.pytest-defaults'

// ... some other awesome features you should enable

应用此配置后,您需要通过单击底部功能区并点击 Refresh

来重新启动您的 Code Assist 会话

刷新 Code Assist 后,我​​们现在可以使用低级功能来解析我们的 .xml 文件,现在我们需要测试它!

测试解析器

如果我们采用与相同的测试驱动开发风格,我们最终会得到/transforms-python/src/myproject/datasets/xml_parse_transform.py,内容如下:

from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many


def read_files(spark_session, paths):
    parsed_dfs = []
    for file_name in paths:
        parsed_df = spark_session.read.format('xml').options(rowTag="tag").load(file_name)
        parsed_dfs += [parsed_df]
    output_df = union_many(*parsed_dfs, how="wide")
    return output_df


@transform(
    the_output=Output("my.awesome.output"),
    the_input=Input("my.awesome.input"),
)
def my_compute_function(the_input, the_output, ctx):
    session = ctx.spark_session
    input_filesystem = the_input.filesystem()
    hadoop_path = input_filesystem.hadoop_path
    files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
    output_df = read_files(session, files)
    the_output.write_dataframe(output_df)

...一个示例文件 /transforms-python/test/myproject/datasets/sample.xml,内容为:

<tag>
<field1>
my_value
</field1>
</tag>

和一个测试文件/transforms-python/test/myproject/datasets/test_xml_parse_transform.py:

from myproject.datasets import xml_parse_transform
from pkg_resources import resource_filename


def test_parse_xml(spark_session):
    file_path = resource_filename(__name__, "sample.xml")
    parsed_df = xml_parse_transform.read_files(spark_session, [file_path])
    assert parsed_df.count() == 1
    assert set(parsed_df.columns) == {"field1"}

我们现在有:

  1. 分布式计算、低级 .xml 高度可扩展的解析器
  2. 我们可以快速迭代以获得正确功能的测试驱动设置

干杯