Databricks Spark CREATE TABLE 永远需要 100 万个小 XML 文件
Databricks Spark CREATE TABLE takes forever for 1 million small XML files
我在 Azure Blob 存储中有一组 100 万 XML 个文件,每个文件的大小为 ~14KB,已挂载在 Azure Databricks 中,我正在尝试使用 CREATE TABLE
,期望每个文件有一条记录。
实验
文件的内容结构如下图所示。为了简单和性能实验,除了 <ID>
元素之外的所有文件内容都保持相同。
<OBSERVATION>
<HEADER>...</HEADER>
<RESULT>
<ID>...</ID>
<VALUES>...</VALUES>
</RESULT>
</OBSERVATION>
对于 parsing/deserialization,我使用的是 Databricks 的 spark-xml。此时此刻,我希望记录有两列 HEADER
和 RESULT
,这就是我得到的。
CREATE TABLE Observations
USING XML
OPTIONS (
path "/mnt/blobstorage/records/*.xml",
rowTag "RESULT",
rootTag "OBSERVATION",
excludeAttribute True
)
问题
CREATE TABLE
语句 运行s 5.5 小时(在 Spark [中具有名称 sql at SQLDriverLocal.scala:87
的 SQL 查询 [= =94=]) 其中只有 1 小时 用于 Spark 作业(在 Spark UI 的作业选项卡中)。
我注意到带有 CREATE TABLE
命令的单元格大部分时间都停留在 Listing files at "/mnt/blobstorage/records/*.xml"
。首先,我认为这是存储连接器中的缩放问题。但是,我可以 运行 ~500K JSON 类似大小文件的命令 ~25s (XML vs JSON?).
有问题
我也知道 spark-xml
读取所有文件来推断模式,这可能是瓶颈。为了消除这种可能性,我尝试:
- 预定义模式(仅来自第一个 XML 文件)
- 在不解析的情况下以纯文本形式摄取(使用
TEXT
提供程序)。
同样的问题在这两种情况下仍然存在。
对于 10K 条记录,运行s 在 20s 内,并在 30 分钟内 用于 200K 条记录。使用线性缩放(这显然不会发生),100 万条记录将在 ~33 分钟.
内完成
我的 Databricks 集群有 1 个工作节点和 3 个驱动节点,每个节点有 256 GB RAM 和 64 核心,所以应该有不是缓存瓶颈。我已在 4 天内多次 运行 成功重现该问题。
问题
我在这里做错了什么?如果在 CREATE TABLE
期间我可以做一些分区/集群,我该怎么做?
我的猜测是您 运行 遇到了一个小文件问题,因为您只处理了 15 GB。我会将小文件合并到每个 ca 的大文件中。 250 MB 大小。
由于您的数据集仍然很小,您可以在驱动程序上执行此操作。以下代码显示了在驱动程序节点上进行合并(不考虑最佳文件大小):
1。将文件从 Blob 复制到本地文件系统并生成文件合并脚本:
# copy files from mounted storage to driver local storage
dbutils.fs.cp("dbfs:/mnt/blobstorage/records/", "file:/databricks/driver/temp/records", recurse=True)
unzipdir= 'temp/records/'
gzipdir= 'temp/gzip/'
# generate shell-script and write it into the local filesystem
script = "cat " + unzipdir + "*.xml > " + gzipdir + """all.xml gzip """ + gzipdir + "all.xml"
dbutils.fs.put("file:/databricks/driver/scripts/makeone.sh", script, True)
2。 运行 shell 脚本
%sh
sudo sh ./scripts/makeone.sh
3。将文件复制回挂载的存储
dbutils.fs.mv("file:/databricks/driver/" + gzipdir, "dbfs:/mnt/mnt/blobstorage/recordsopt/", recurse=True)
另一个重点是 spark-xml 库采用两步法:
- 它解析数据以推断架构。如果参数 samplingRatio 未更改,它将对整个数据集执行此操作。通常只对较小的样本执行此操作就足够了,或者您可以预定义架构(为此使用参数架构),那么您不需要此步骤。
- 正在读取数据。
最后,我建议将数据存储在 parquet 中,因此在基于列的格式上进行更复杂的查询,然后直接在 xmls 上进行,并为此使用 spark-xml 库预处理步骤。
我在 Azure Blob 存储中有一组 100 万 XML 个文件,每个文件的大小为 ~14KB,已挂载在 Azure Databricks 中,我正在尝试使用 CREATE TABLE
,期望每个文件有一条记录。
实验
文件的内容结构如下图所示。为了简单和性能实验,除了 <ID>
元素之外的所有文件内容都保持相同。
<OBSERVATION>
<HEADER>...</HEADER>
<RESULT>
<ID>...</ID>
<VALUES>...</VALUES>
</RESULT>
</OBSERVATION>
对于 parsing/deserialization,我使用的是 Databricks 的 spark-xml。此时此刻,我希望记录有两列 HEADER
和 RESULT
,这就是我得到的。
CREATE TABLE Observations
USING XML
OPTIONS (
path "/mnt/blobstorage/records/*.xml",
rowTag "RESULT",
rootTag "OBSERVATION",
excludeAttribute True
)
问题
CREATE TABLE
语句 运行s 5.5 小时(在 Spark [中具有名称 sql at SQLDriverLocal.scala:87
的 SQL 查询 [= =94=]) 其中只有 1 小时 用于 Spark 作业(在 Spark UI 的作业选项卡中)。
我注意到带有 CREATE TABLE
命令的单元格大部分时间都停留在 Listing files at "/mnt/blobstorage/records/*.xml"
。首先,我认为这是存储连接器中的缩放问题。但是,我可以 运行 ~500K JSON 类似大小文件的命令 ~25s (XML vs JSON?).
我也知道 spark-xml
读取所有文件来推断模式,这可能是瓶颈。为了消除这种可能性,我尝试:
- 预定义模式(仅来自第一个 XML 文件)
- 在不解析的情况下以纯文本形式摄取(使用
TEXT
提供程序)。 同样的问题在这两种情况下仍然存在。
对于 10K 条记录,运行s 在 20s 内,并在 30 分钟内 用于 200K 条记录。使用线性缩放(这显然不会发生),100 万条记录将在 ~33 分钟.
内完成我的 Databricks 集群有 1 个工作节点和 3 个驱动节点,每个节点有 256 GB RAM 和 64 核心,所以应该有不是缓存瓶颈。我已在 4 天内多次 运行 成功重现该问题。
问题
我在这里做错了什么?如果在 CREATE TABLE
期间我可以做一些分区/集群,我该怎么做?
我的猜测是您 运行 遇到了一个小文件问题,因为您只处理了 15 GB。我会将小文件合并到每个 ca 的大文件中。 250 MB 大小。 由于您的数据集仍然很小,您可以在驱动程序上执行此操作。以下代码显示了在驱动程序节点上进行合并(不考虑最佳文件大小):
1。将文件从 Blob 复制到本地文件系统并生成文件合并脚本:
# copy files from mounted storage to driver local storage
dbutils.fs.cp("dbfs:/mnt/blobstorage/records/", "file:/databricks/driver/temp/records", recurse=True)
unzipdir= 'temp/records/'
gzipdir= 'temp/gzip/'
# generate shell-script and write it into the local filesystem
script = "cat " + unzipdir + "*.xml > " + gzipdir + """all.xml gzip """ + gzipdir + "all.xml"
dbutils.fs.put("file:/databricks/driver/scripts/makeone.sh", script, True)
2。 运行 shell 脚本
%sh
sudo sh ./scripts/makeone.sh
3。将文件复制回挂载的存储
dbutils.fs.mv("file:/databricks/driver/" + gzipdir, "dbfs:/mnt/mnt/blobstorage/recordsopt/", recurse=True)
另一个重点是 spark-xml 库采用两步法:
- 它解析数据以推断架构。如果参数 samplingRatio 未更改,它将对整个数据集执行此操作。通常只对较小的样本执行此操作就足够了,或者您可以预定义架构(为此使用参数架构),那么您不需要此步骤。
- 正在读取数据。
最后,我建议将数据存储在 parquet 中,因此在基于列的格式上进行更复杂的查询,然后直接在 xmls 上进行,并为此使用 spark-xml 库预处理步骤。