使用 lxml.etree 从 HDFS 读取 XML 文件以在 Pyspark 中解析

Read XML file from HDFS to parse in Pyspark with lxml.etree

我已经使用 lxml.etree 在 Python 中编写了一个解析器,现在我正尝试 运行 在 Hadoop 集群上使用所述解析器。当我 运行 该函数在本地按预期工作时,但是当我尝试将它应用于集群上的文件时收到以下错误(我在 Pyspark shell、python3)

xml_pathname = "hdfs://file_path/date_directory/example_one.xml"
xml_tree = etree.parse(xml_pathname)

OSError: Error reading file '/file_path/date_directory/example_one.xml': failed to load external entity 
"/file_path/date_directory/example_one.xml"

我在终端运行 hdfs dfs -ls /file_path/date_directory/example_one.xml 时可以看到文件。

我希望得到帮助的两个方面 -

  1. 如何使用 Pyspark 从集群中将 XML 文件加载到 lxml.etree.parse() 方法中?
  2. 如何在 Spark 上有效地将其扩展到 运行?我想使用我的 Python 解析器解析集群上的数百万个 XML 文件——下面的修改是否有效,或者是否有更好的方法来大规模并行化和 运行 解析器?通常,我应该如何在我的 spark 配置中设置参数以获得最佳结果(大量执行程序、多个驱动程序等)?
#Same as above but with wildcards to parse millions of XML files

xml_pathname = "hdfs://file_path/*/*.xml"
xml_tree = etree.parse(xml_pathname)

已经为此工作了一段时间,非常感谢任何和所有的帮助。感谢你们

  1. mapValues() 函数证明很有用。 Sark 配置的 XML 解析器,例如 Pubmed 解析器,也提供了有用的样板代码,例如:
path_rdd = sc.parallelize(path_sample, numSlices=10000) # use only example path
    parse_results_rdd = path_rdd.map(lambda x: Row(file_name=os.path.basename(x), **pp.parse_pubmed_xml(x)))
    pubmed_oa_df = parse_results_rdd.toDF()
    pubmed_oa_df_sel = pubmed_oa_df[['full_title', 'abstract', 'doi',
                                     'file_name', 'pmc', 'pmid',
                                     'publication_year', 'publisher_id',
                                     'journal', 'subjects']]
    pubmed_oa_df_sel.write.parquet(os.path.join(save_dir, 'pubmed_oa_%s.parquet' % date_update_str),
                                   mode='overwrite')

https://github.com/titipata/pubmed_parser/blob/master/scripts/pubmed_oa_spark.py

  1. 使用 fs.globStatus 允许在一个子目录中检索多个 XML 文件。