Apache Beam python 无法解析 pubmed XML

Apache Beam python fails to parse pubmed XML

您好,我已经编写了一个 beam 管道来读取目录并使用 pubmed_parse 库解析下载的 pubmed xml 文件。该库通过标准 python 程序运行良好,但如果我将其转换为 apache beam 管道,如下所示,ti 在解析期间失败并出现错误: 希望能帮助解决这个问题

File "/home/micdsouz/venv/medline/data-preprocessing.py", line 19, in process
    pubmed_dict = pp.parse_pubmed_xml(element)
  File "/home/micdsouz/venv/local/lib/python2.7/site-packages/pubmed_parser/pubmed_oa_parser.py", line 112, in parse_pubmed_xml
    dict_article_meta = parse_article_meta(tree)
  File "/home/micdsouz/venv/local/lib/python2.7/site-packages/pubmed_parser/pubmed_oa_parser.py", line 60, in parse_article_meta
    pmid_node = article_meta.find('article-id[@pub-id-type="pmid"]')
AttributeError: 'NoneType' object has no attribute 'find' [while running 'ReadData']

from __future__ import absolute_import
import argparse
import os
import logging
import re
import pubmed_parser as pp
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class ExtractXMLTags(beam.DoFn):
    def process(self,element):
        print('Current URL = {} '.format(element))
        pubmed_dict = pp.parse_pubmed_xml(element)
        print('Dictionary output = \n {}'.format(pubmed_dict))
        yield pubmed_dict

def run(argv=None):
    """Main entry point; defines and runs the preprocessing  pipeline."""
    print('In Run - Begin processing')
    # setup the argument parser and arguments
    parser = argparse.ArgumentParser(description='program to preprocess the medline xml files and extract the important fields.')
    #Add the arguments
    print('Adding Arguments')
    parser.add_argument(
            '--input',
            default='gs://medline-221810/medline/xml_files/',
            help='Path to input files. Can be gs or local path')
    parser.add_argument(
            '--output',
            default='gs://medline-221810/medline/xml_output_files/xml_data.txt',
            help='Path to final output file.')
    parser.add_argument(
            '--batchsize',
            default=50,
            help='Batch size for the processing.')
    parser.add_argument(
            '--filenums',
            default=50,
            help='The number of filesin total to process.')

    #Get the known and additional arguments sent
    known_args, pipeline_args = parser.parse_known_args(argv)

    # Set up the pipeline
    # Specifiy the pipeline arguments
    # include the parser folder under extra_packages
    pipeline_args.extend([
            '--runner=DataflowRunner',
            '--project=medline-221810',
            '--staging_location=gs://medline-221810/medline/staging',
            '--temp_location=gs://medline-221810/medline/temp',
            '--job_name=medline-preprocess-x1-job',
            '--extra_package=pubmed_parser.tar.gz'
            ])
    # setup pipeline options
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    # set the variables from command line arguments
    num_xml_files=int(known_args.filenums)
    batch_size=known_args.batchsize
    uri=known_args.input

    # Create a list of files to be processed
    with beam.Pipeline(options=pipeline_options) as p:
        # Read the text file[pattern] into a PCollection
        print('Get the files and urls')
        print('uri = {} '.format(uri))
        gsurls = [os.path.join(uri,'pubmed18n%04d.xml.gz' % j)
                for j in range(1, num_xml_files + 1)
                if os.path.exists(os.path.join(uri,'pubmed18n%04d.xml.gz' % j))                
                ]

        print('gsurls = \n {}'.format(gsurls))
        # build the pipeline
        parsed_data = p | 'CreatePColData' >> beam.Create(gsurls) | 'ReadData' >> beam.ParDo(ExtractXMLTags())

        print('Sent to pipeline ....')
        print('Exiting run')


if __name__ == "__main__":
    # Setup logger 
    logging.getLogger().setLevel(logging.DEBUG)
    print('in main')
    # call run to begin processing
    run()

这看起来像是 pubmed_parser 库的问题。他们的跟踪器中有两个问题似乎与您的问题相符: