在 Java 中使用多线程解析 XML

Parse XML using Multiple Threads In Java

我使用 JAXB 在 java 中读取了大约 4 GB 的大型 XML,我有一个带有 SSD、RAM 和多个 CPU 内核的良好系统。我想使用多个线程读取 XML 文件。我已经对其进行了研究,但尚未找到任何解决方案。

我在想,如果我可以使用多个线程读取 XML 并发送字节块以通过 XML 解析器进行解析,那会很好,但想知道是否已经存在解决方案与实施。

我的代码片段是

public void parseXML() throws Exception{

    try(InputStream is = new BufferedInputStream(new FileInputStream(xmlFile),XML_READ_BUFFER)){
    //try(InputStream is = new ByteArrayInputStream(removeAnd.getBytes(StandardCharsets.UTF_16))){ 
        XMLInputFactory xmlif = XMLInputFactory.newInstance();
        XMLStreamReader sr = xmlif.createXMLStreamReader(is);

        JAXBContext ctx = JAXBContext.newInstance(XwaysImage.class);
        Unmarshaller unmar = ctx.createUnmarshaller();

        int c=0;
        while (sr.hasNext()){

            while(this.pause.get())Thread.sleep(100);
            if(this.cancel.get()) break;

            int eventType = sr.next();
            if(eventType == XMLStreamConstants.START_ELEMENT){
                if("ImageFile".equals(sr.getName().getLocalPart())){
                    XwaysImage xim = unmar.unmarshal(sr,XwaysImage.class).getValue();
                    //TODO code here. 
                }
            }
        }
        sr.close();
        is.close();
    }catch(Exception e){
        log.error("",e);
    }
}

不确定我是否完全理解您的代码的哪一部分需要并发,但如果它是您的 while 循环,您可以尝试:

    sr.parallelStream().forEach(-> {
     //do something
})

因为这不是 DOM 风格的解析器,所以从磁盘低级读取 XML 文件的速度很快,尤其是从 SSD。所以不要认为多线程阅读会有帮助。

但是检索到的数据的多线程处理可以提高整体性能,而不是 'read the XML using multiple Threads and send the chunks of bytes to parse' 尝试在单线程中读取,但并行处理。

已经有项目尝试将并行处理应用于 XML 解析——参见示例 https://www.ibm.com/support/knowledgecenter/en/SSZJPZ_8.7.0/com.ibm.swg.im.iis.ds.stages.xml.core.usage.doc/topics/largescaleparallelparsing.html——但我不知道是否有可在实践中使用的工具。从本质上讲,这不是一项可以轻松并行化为独立线程的任务。

无论如何,解析的成本是多少?在许多应用中,25% 可能是典型值。如果您是这种情况,那么最好的方法可能是让一个线程执行解析,而其他线程处理解析后的数据。

也许你可以试试 Declarative Stream Mapping (DSM) 库。它非常适合处理大型或复杂的 XML 和 JSON 文档。您需要在 YAML 文件中定义 class 结束 XML 数据之间的映射。

例如,假设您有以下 xml 文件:

<root>
  <item >
    <id>1</id>
    <name>Item 1</name>
  </item>
  <item >
    <id>2</id>
    <name>Item 2</name>
    <date>13/06/2019</date>
  </item>
  <item >
    <id>3</id>
    <name>Item 3</name>
    <date>11/06/2019</date>
  </item>
  <!-- 
  .........
  -->
</root>

为要处理的数据定义映射

result:
   type: object  // it will only store one item in memory.
   path: /root/item    # path is regex can be writen as "/.+item".
   function: processData   # call processData function for every item.
   filter: self.index%params.threadCount==params.threadNo  // you can write script to filter data.
   fields:
     id: long   # id dataType long
     name:      # default dataType string         
     registerDate:   
        path: date
        dataType: date   # data type is date
        dataTypeParams: 
           dateFormat: dd/MM/yyyy  # date format

编写函数来执行您的数据并将其注册到映射文件,如上所示。

FunctionExecutor processData = new FunctionExecutor() {
        @Override
        public void execute(Params params) {
            System.out.println(params.getCurrentNode().getData());
        }
    };

    // java 8+
    //FunctionExecutor processData = params->System.out.println(params.getCurrentNode().getData());

这是 java 代码。您可以为每个线程设置 threadNo。我假设您将 运行 在 10 个线程中编写代码。 对于此示例线程不是 1。这意味着您将只处理与映射文件中的 filter 字段匹配的项目。

DSMBuilder builder = new DSMBuilder("path/to/mapping.yaml");
    builder.registerFunction("processData ", processData); // register function
        builder.getParams().put("threadCount", 10);
        builder.getParams().put("threadNo", 1);  // run for first thread
    DSM dsm = builder.create();
    // process json data
    Object object = dsm.toObject("path/to/data.xml");