Databricks Spark CREATE TABLE 永远需要 100 万个小 XML 文件

Databricks Spark CREATE TABLE takes forever for 1 million small XML files

我在 Azure Blob 存储中有一组 100 万 XML 个文件,每个文件的大小为 ~14KB,已挂载在 Azure Databricks 中,我正在尝试使用 CREATE TABLE,期望每个文件有一条记录。

实验

文件的内容结构如下图所示。为了简单和性能实验,除了 <ID> 元素之外的所有文件内容都保持相同。

<OBSERVATION>
  <HEADER>...</HEADER>
  <RESULT>
    <ID>...</ID>
    <VALUES>...</VALUES>
  </RESULT>
</OBSERVATION>

对于 parsing/deserialization,我使用的是 Databricks 的 spark-xml。此时此刻,我希望记录有两列 HEADERRESULT,这就是我得到的。

CREATE TABLE Observations
USING XML
OPTIONS (
  path "/mnt/blobstorage/records/*.xml",
  rowTag "RESULT",
  rootTag "OBSERVATION",
  excludeAttribute True
)

问题

CREATE TABLE 语句 运行s 5.5 小时(在 Spark [中具有名称 sql at SQLDriverLocal.scala:87 的 SQL 查询 [= =94=]) 其中只有 1 小时 用于 Spark 作业(在 Spark UI 的作业选项卡中)。

我注意到带有 CREATE TABLE 命令的单元格大部分时间都停留在 Listing files at "/mnt/blobstorage/records/*.xml"。首先,我认为这是存储连接器中的缩放问题。但是,我可以 运行 ~500K JSON 类似大小文件的命令 ~25s (XML vs JSON?).

有问题

我也知道 spark-xml 读取所有文件来推断模式,这可能是瓶颈。为了消除这种可能性,我尝试:

对于 10K 条记录,运行s 在 20s 内,并在 30 分钟内 用于 200K 条记录。使用线性缩放(这显然不会发生),100 万条记录将在 ~33 分钟.

内完成

我的 Databricks 集群有 1 个工作节点和 3 个驱动节点,每个节点有 256 GB RAM 和 64 核心,所以应该有不是缓存瓶颈。我已在 4 天内多次 运行 成功重现该问题。

问题

我在这里做错了什么?如果在 CREATE TABLE 期间我可以做一些分区/集群,我该怎么做?

我的猜测是您 运行 遇到了一个小文件问题,因为您只处理了 15 GB。我会将小文件合并到每个 ca 的大文件中。 250 MB 大小。 由于您的数据集仍然很小,您可以在驱动程序上执行此操作。以下代码显示了在驱动程序节点上进行合并(不考虑最佳文件大小):

1。将文件从 Blob 复制到本地文件系统并生成文件合并脚本:

# copy files from mounted storage to driver local storage
dbutils.fs.cp("dbfs:/mnt/blobstorage/records/", "file:/databricks/driver/temp/records", recurse=True)  

unzipdir= 'temp/records/'
gzipdir= 'temp/gzip/'

# generate shell-script and write it into the local filesystem
script = "cat " + unzipdir + "*.xml > " + gzipdir + """all.xml gzip """ + gzipdir + "all.xml"
dbutils.fs.put("file:/databricks/driver/scripts/makeone.sh", script, True)

2。 运行 shell 脚本

%sh
sudo sh ./scripts/makeone.sh

3。将文件复制回挂载的存储

dbutils.fs.mv("file:/databricks/driver/" + gzipdir, "dbfs:/mnt/mnt/blobstorage/recordsopt/", recurse=True) 

另一个重点是 spark-xml 库采用两步法:

  1. 它解析数据以推断架构。如果参数 samplingRatio 未更改,它将对整个数据集执行此操作。通常只对较小的样本执行此操作就足够了,或者您可以预定义架构(为此使用参数架构),那么您不需要此步骤。
  2. 正在读取数据。

最后,我建议将数据存储在 parquet 中,因此在基于列的格式上进行更复杂的查询,然后直接在 xmls 上进行,并为此使用 spark-xml 库预处理步骤。