使用 pyspark 处理大量小 json 文件

Dealing with large number of small json files using pyspark

我在 S3 的目录下有大约 376KJSON 个文件。这些文件每个 2.5 KB,只包含一个 record/file。当我尝试通过以下代码通过 Glue ETL20 workers:

加载整个目录时
spark.read.json("path")  

只是没有 运行。 5 小时后出现 Timeout。所以,我开发了 运行 一个 shell script 来将这些文件的记录合并在一个文件下,当我尝试加载它时,它只显示一条记录。合并后的文件大小为 980 MB。在将这 4 条记录合并到一个文件下后,在本地测试时,它对 4 条记录运行良好。它按预期显示了 4 条记录。

我使用以下命令将来自不同文件的 JSON 条记录附加到一个文件下:

for f in Agent/*.txt; do cat ${f} >> merged.json;done;  

它没有任何嵌套 JSON。我什至尝试了 multiline 选项,但没有用。那么,在这种情况下可以做什么?根据我的说法,合并时它不会单独处理记录,因此会导致问题。我什至尝试 head -n 10 显示 top 10 lines 但它会进入无限循环。

我在过去处理数千个小文件时遇到过 运行 麻烦。在我的例子中,他们的 csv 文件不是 json。我尝试调试的其中一件事是创建一个 for 循环并加载较小的批次,然后将所有数据框组合在一起。在每次迭代期间,我都会调用一个动作来强制执行。我会记录进度以了解它正在取得进展。并监控它是如何随着工作的进行而变慢的

问题出在我的 shell 脚本上,该脚本用于合并多个小文件。 Post 合并,记录未正确对齐,因此它们未被视为单独的记录。

因为我处理的是 JSON 数据集,所以我使用了 jq 实用程序来处理它。下面是 shell 脚本,它将 faster way 中的大量记录合并到一个文件中:

find . -name '*.txt' -exec cat '{}' + | jq -s '.' > output.txt  

稍后,我能够使用以下代码按预期加载 JSON 记录:

spark.read.option("multiline","true").json("path")