使用 lxml.etree 从 HDFS 读取 XML 文件以在 Pyspark 中解析
Read XML file from HDFS to parse in Pyspark with lxml.etree
我已经使用 lxml.etree 在 Python 中编写了一个解析器,现在我正尝试 运行 在 Hadoop 集群上使用所述解析器。当我 运行 该函数在本地按预期工作时,但是当我尝试将它应用于集群上的文件时收到以下错误(我在 Pyspark shell、python3)
xml_pathname = "hdfs://file_path/date_directory/example_one.xml"
xml_tree = etree.parse(xml_pathname)
OSError: Error reading file '/file_path/date_directory/example_one.xml': failed to load external entity
"/file_path/date_directory/example_one.xml"
我在终端运行 hdfs dfs -ls /file_path/date_directory/example_one.xml
时可以看到文件。
我希望得到帮助的两个方面 -
- 如何使用 Pyspark 从集群中将 XML 文件加载到 lxml.etree.parse() 方法中?
- 如何在 Spark 上有效地将其扩展到 运行?我想使用我的 Python 解析器解析集群上的数百万个 XML 文件——下面的修改是否有效,或者是否有更好的方法来大规模并行化和 运行 解析器?通常,我应该如何在我的 spark 配置中设置参数以获得最佳结果(大量执行程序、多个驱动程序等)?
#Same as above but with wildcards to parse millions of XML files
xml_pathname = "hdfs://file_path/*/*.xml"
xml_tree = etree.parse(xml_pathname)
已经为此工作了一段时间,非常感谢任何和所有的帮助。感谢你们
- mapValues() 函数证明很有用。 Sark 配置的 XML 解析器,例如 Pubmed 解析器,也提供了有用的样板代码,例如:
path_rdd = sc.parallelize(path_sample, numSlices=10000) # use only example path
parse_results_rdd = path_rdd.map(lambda x: Row(file_name=os.path.basename(x), **pp.parse_pubmed_xml(x)))
pubmed_oa_df = parse_results_rdd.toDF()
pubmed_oa_df_sel = pubmed_oa_df[['full_title', 'abstract', 'doi',
'file_name', 'pmc', 'pmid',
'publication_year', 'publisher_id',
'journal', 'subjects']]
pubmed_oa_df_sel.write.parquet(os.path.join(save_dir, 'pubmed_oa_%s.parquet' % date_update_str),
mode='overwrite')
https://github.com/titipata/pubmed_parser/blob/master/scripts/pubmed_oa_spark.py
- 使用 fs.globStatus 允许在一个子目录中检索多个 XML 文件。
我已经使用 lxml.etree 在 Python 中编写了一个解析器,现在我正尝试 运行 在 Hadoop 集群上使用所述解析器。当我 运行 该函数在本地按预期工作时,但是当我尝试将它应用于集群上的文件时收到以下错误(我在 Pyspark shell、python3)
xml_pathname = "hdfs://file_path/date_directory/example_one.xml"
xml_tree = etree.parse(xml_pathname)
OSError: Error reading file '/file_path/date_directory/example_one.xml': failed to load external entity
"/file_path/date_directory/example_one.xml"
我在终端运行 hdfs dfs -ls /file_path/date_directory/example_one.xml
时可以看到文件。
我希望得到帮助的两个方面 -
- 如何使用 Pyspark 从集群中将 XML 文件加载到 lxml.etree.parse() 方法中?
- 如何在 Spark 上有效地将其扩展到 运行?我想使用我的 Python 解析器解析集群上的数百万个 XML 文件——下面的修改是否有效,或者是否有更好的方法来大规模并行化和 运行 解析器?通常,我应该如何在我的 spark 配置中设置参数以获得最佳结果(大量执行程序、多个驱动程序等)?
#Same as above but with wildcards to parse millions of XML files
xml_pathname = "hdfs://file_path/*/*.xml"
xml_tree = etree.parse(xml_pathname)
已经为此工作了一段时间,非常感谢任何和所有的帮助。感谢你们
- mapValues() 函数证明很有用。 Sark 配置的 XML 解析器,例如 Pubmed 解析器,也提供了有用的样板代码,例如:
path_rdd = sc.parallelize(path_sample, numSlices=10000) # use only example path
parse_results_rdd = path_rdd.map(lambda x: Row(file_name=os.path.basename(x), **pp.parse_pubmed_xml(x)))
pubmed_oa_df = parse_results_rdd.toDF()
pubmed_oa_df_sel = pubmed_oa_df[['full_title', 'abstract', 'doi',
'file_name', 'pmc', 'pmid',
'publication_year', 'publisher_id',
'journal', 'subjects']]
pubmed_oa_df_sel.write.parquet(os.path.join(save_dir, 'pubmed_oa_%s.parquet' % date_update_str),
mode='overwrite')
https://github.com/titipata/pubmed_parser/blob/master/scripts/pubmed_oa_spark.py
- 使用 fs.globStatus 允许在一个子目录中检索多个 XML 文件。