使用 XmlIo 在 apache beam 中读取 xml 文件

Reading an xml file in apache beam using XmlIo

问题陈述: 我正在尝试使用直接运行器在 beam 中读取和打印 xml 文件的内容 这是代码片段:

 public  class  BookStore{

 public  static  void  main  (string  args[]){

 BookOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BookOptions .class); 

 Pipeline pipeline = Pipeline.create(options);

 PCollection<Book> output = pipeline.apply(XmlIO.<Book>read().from("sample.xml")
                 .withRootElement("book") 
                 .withRecordElement("name")
                 .withRecordClass(Book.class));  

         output.apply(ParDo.of(new DoFn<Book,String>(){
             @ProcessElement 
             public void processElement(ProcessContext c)
             {
                 System.out.println("xml  data "+c.element().getname());    
             }
          }));
 pipeline.run();
}
}

我的 pojo class:


@XmlRootElement(name = "book")
@XmlType(propOrder = {"name"})
public class Book{

    private String name;
    @XmlElement(name = "name")
    public String getName ()
    {
    return name;
    }

    public void setName (String name)
    {
    this.name = name;
    }

    @Override
    public String toString()
    {
    return "ClassPojo [name= "+name+"]";
    }

}

我的sample.xml文件

<?xml version="1.0" encoding="UTF-8"?> 
<book>
   <name>Harrypotter</name>
</book>

当我使用直接运行程序执行上述代码时,我得到 "name" 的输出为 null

有人可以指导我吗。

有没有我可以参考的例子......?

您的 XML 文件与您在管道中定义的 XmlIO 选项不对应 - 您需要有一个包含您的记录(书籍)的根元素。解决方案之一可能是这样的:

PCollection<Book> output = pipeline.apply(
        XmlIO.<Book>read().from("sample.xml")
            .withRootElement("books")
            .withRecordElement("book")
            .withRecordClass(Book.class));

和 XML 文件应如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<books>
    <book>
        <name>Harrypotter</name>
    </book>
</books>

在使用较新版本的 beam-sdks-java-core (2.31.0)

时,只需为像我这样的管道新手添加问题中的可运行版本
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.xml.XmlIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class BookStore {
    public static void main(String[] args) {

        var pipeline = Pipeline.create(PipelineOptionsFactory.create());

        pipeline
                .apply(XmlIO.<Book>read().from("sample.xml")
                        .withRootElement("books")
                        .withRecordElement("book")
                        .withRecordClass(Book.class))
                .apply(ParDo.of(new DoFn<Book, String>() {

                    @ProcessElement
                    public void processElement(
                            @Element Book element, OutputReceiver<String> receiver) {

                        System.out.println("Xml data: " + element.getName());
                        receiver.output(element.getName());
                    }
                }));

        pipeline.run();
    }
}

备选方案:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.xml.XmlIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BookStore {
    public static void main(String[] args) {

        var pipeline = Pipeline.create(PipelineOptionsFactory.create());

        pipeline
                .apply(XmlIO.<Book>read().from("sample.xml")
                        .withRootElement("books")
                        .withRecordElement("book")
                        .withRecordClass(Book.class))
                .apply(MapElements.into(TypeDescriptors.strings())
                        .via((Book book) -> {
                            System.out.println("Xml data: " + book.getName());
                            return book.getName();
                        }));

        pipeline.run();
    }
}