Apache Beam python 无法解析 pubmed XML
Apache Beam python fails to parse pubmed XML
您好,我已经编写了一个 beam 管道来读取目录并使用 pubmed_parse 库解析下载的 pubmed xml 文件。该库通过标准 python 程序运行良好,但如果我将其转换为 apache beam 管道,如下所示,ti 在解析期间失败并出现错误:
希望能帮助解决这个问题
File "/home/micdsouz/venv/medline/data-preprocessing.py", line 19, in process
pubmed_dict = pp.parse_pubmed_xml(element)
File "/home/micdsouz/venv/local/lib/python2.7/site-packages/pubmed_parser/pubmed_oa_parser.py", line 112, in parse_pubmed_xml
dict_article_meta = parse_article_meta(tree)
File "/home/micdsouz/venv/local/lib/python2.7/site-packages/pubmed_parser/pubmed_oa_parser.py", line 60, in parse_article_meta
pmid_node = article_meta.find('article-id[@pub-id-type="pmid"]')
AttributeError: 'NoneType' object has no attribute 'find' [while running 'ReadData']
from __future__ import absolute_import
import argparse
import os
import logging
import re
import pubmed_parser as pp
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class ExtractXMLTags(beam.DoFn):
def process(self,element):
print('Current URL = {} '.format(element))
pubmed_dict = pp.parse_pubmed_xml(element)
print('Dictionary output = \n {}'.format(pubmed_dict))
yield pubmed_dict
def run(argv=None):
"""Main entry point; defines and runs the preprocessing pipeline."""
print('In Run - Begin processing')
# setup the argument parser and arguments
parser = argparse.ArgumentParser(description='program to preprocess the medline xml files and extract the important fields.')
#Add the arguments
print('Adding Arguments')
parser.add_argument(
'--input',
default='gs://medline-221810/medline/xml_files/',
help='Path to input files. Can be gs or local path')
parser.add_argument(
'--output',
default='gs://medline-221810/medline/xml_output_files/xml_data.txt',
help='Path to final output file.')
parser.add_argument(
'--batchsize',
default=50,
help='Batch size for the processing.')
parser.add_argument(
'--filenums',
default=50,
help='The number of filesin total to process.')
#Get the known and additional arguments sent
known_args, pipeline_args = parser.parse_known_args(argv)
# Set up the pipeline
# Specifiy the pipeline arguments
# include the parser folder under extra_packages
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=medline-221810',
'--staging_location=gs://medline-221810/medline/staging',
'--temp_location=gs://medline-221810/medline/temp',
'--job_name=medline-preprocess-x1-job',
'--extra_package=pubmed_parser.tar.gz'
])
# setup pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
# set the variables from command line arguments
num_xml_files=int(known_args.filenums)
batch_size=known_args.batchsize
uri=known_args.input
# Create a list of files to be processed
with beam.Pipeline(options=pipeline_options) as p:
# Read the text file[pattern] into a PCollection
print('Get the files and urls')
print('uri = {} '.format(uri))
gsurls = [os.path.join(uri,'pubmed18n%04d.xml.gz' % j)
for j in range(1, num_xml_files + 1)
if os.path.exists(os.path.join(uri,'pubmed18n%04d.xml.gz' % j))
]
print('gsurls = \n {}'.format(gsurls))
# build the pipeline
parsed_data = p | 'CreatePColData' >> beam.Create(gsurls) | 'ReadData' >> beam.ParDo(ExtractXMLTags())
print('Sent to pipeline ....')
print('Exiting run')
if __name__ == "__main__":
# Setup logger
logging.getLogger().setLevel(logging.DEBUG)
print('in main')
# call run to begin processing
run()
这看起来像是 pubmed_parser
库的问题。他们的跟踪器中有两个问题似乎与您的问题相符:
您好,我已经编写了一个 beam 管道来读取目录并使用 pubmed_parse 库解析下载的 pubmed xml 文件。该库通过标准 python 程序运行良好,但如果我将其转换为 apache beam 管道,如下所示,ti 在解析期间失败并出现错误: 希望能帮助解决这个问题
File "/home/micdsouz/venv/medline/data-preprocessing.py", line 19, in process
pubmed_dict = pp.parse_pubmed_xml(element)
File "/home/micdsouz/venv/local/lib/python2.7/site-packages/pubmed_parser/pubmed_oa_parser.py", line 112, in parse_pubmed_xml
dict_article_meta = parse_article_meta(tree)
File "/home/micdsouz/venv/local/lib/python2.7/site-packages/pubmed_parser/pubmed_oa_parser.py", line 60, in parse_article_meta
pmid_node = article_meta.find('article-id[@pub-id-type="pmid"]')
AttributeError: 'NoneType' object has no attribute 'find' [while running 'ReadData']
from __future__ import absolute_import
import argparse
import os
import logging
import re
import pubmed_parser as pp
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class ExtractXMLTags(beam.DoFn):
def process(self,element):
print('Current URL = {} '.format(element))
pubmed_dict = pp.parse_pubmed_xml(element)
print('Dictionary output = \n {}'.format(pubmed_dict))
yield pubmed_dict
def run(argv=None):
"""Main entry point; defines and runs the preprocessing pipeline."""
print('In Run - Begin processing')
# setup the argument parser and arguments
parser = argparse.ArgumentParser(description='program to preprocess the medline xml files and extract the important fields.')
#Add the arguments
print('Adding Arguments')
parser.add_argument(
'--input',
default='gs://medline-221810/medline/xml_files/',
help='Path to input files. Can be gs or local path')
parser.add_argument(
'--output',
default='gs://medline-221810/medline/xml_output_files/xml_data.txt',
help='Path to final output file.')
parser.add_argument(
'--batchsize',
default=50,
help='Batch size for the processing.')
parser.add_argument(
'--filenums',
default=50,
help='The number of filesin total to process.')
#Get the known and additional arguments sent
known_args, pipeline_args = parser.parse_known_args(argv)
# Set up the pipeline
# Specifiy the pipeline arguments
# include the parser folder under extra_packages
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=medline-221810',
'--staging_location=gs://medline-221810/medline/staging',
'--temp_location=gs://medline-221810/medline/temp',
'--job_name=medline-preprocess-x1-job',
'--extra_package=pubmed_parser.tar.gz'
])
# setup pipeline options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
# set the variables from command line arguments
num_xml_files=int(known_args.filenums)
batch_size=known_args.batchsize
uri=known_args.input
# Create a list of files to be processed
with beam.Pipeline(options=pipeline_options) as p:
# Read the text file[pattern] into a PCollection
print('Get the files and urls')
print('uri = {} '.format(uri))
gsurls = [os.path.join(uri,'pubmed18n%04d.xml.gz' % j)
for j in range(1, num_xml_files + 1)
if os.path.exists(os.path.join(uri,'pubmed18n%04d.xml.gz' % j))
]
print('gsurls = \n {}'.format(gsurls))
# build the pipeline
parsed_data = p | 'CreatePColData' >> beam.Create(gsurls) | 'ReadData' >> beam.ParDo(ExtractXMLTags())
print('Sent to pipeline ....')
print('Exiting run')
if __name__ == "__main__":
# Setup logger
logging.getLogger().setLevel(logging.DEBUG)
print('in main')
# call run to begin processing
run()
这看起来像是 pubmed_parser
库的问题。他们的跟踪器中有两个问题似乎与您的问题相符: