alpakka-xml 可以处理多个 xml 文件吗?
Can alpakka-xml process multiple xml files?
我在使用 alpakka 的 XmlParsing Flow 时遇到问题
val files: List[String] = ... // file paths locally on disk
// simple source emitting the contents of 2 XML files
val documentSource = FileIO.fromPath(Paths.get(files.head))
.concat(FileIO.fromPath(Paths.get(files(1))))
val contentFlow: Flow[ParseEvent, CustomContent, Notused] =
Flow.fromGraph(new ContentProcessorFlow)
documentSource
.via(XmlParsing.parser)
.via(contentFlow)
.to(Sink.foreach(println))
.run
当这是 运行 时,图表会打印 contentFlow
发出的元素,这些元素是正确的并且与第一个文件的预期值匹配。在那之后,抛出这个异常:
[ERROR] [12/20/2018 16:32:23.648] [Sync-akka.actor.default-dispatcher-2] [akka://Sync/system/StreamSupervisor-0/flow-0-0-ignoreSink] Error in stage [akka.stream.alpakka.xml.impl.StreamingXmlParser@36b80955]: Illegal processing instruction target: 'xml' (case insensitive) is reserved by the xml specification
at [row,col {unknown-source}]: [44,17]
com.fasterxml.aalto.WFCException: Illegal processing instruction target: 'xml' (case insensitive) is reserved by the xml specification
at [row,col {unknown-source}]: [44,17]
at com.fasterxml.aalto.in.XmlScanner.reportInputProblem(XmlScanner.java:1333)
at com.fasterxml.aalto.async.AsyncByteScanner.checkPITargetName(AsyncByteScanner.java:665)
at com.fasterxml.aalto.async.AsyncByteArrayScanner.handlePI(AsyncByteArrayScanner.java:2091)
at com.fasterxml.aalto.async.AsyncByteArrayScanner.nextFromProlog(AsyncByteArrayScanner.java:1064)
at com.fasterxml.aalto.stax.StreamReaderImpl.next(StreamReaderImpl.java:802)
at akka.stream.alpakka.xml.impl.StreamingXmlParser$$anon.advanceParser(StreamingXmlParser.scala:55)
我了解这里发生的事情的基础 - 解析器抱怨第二个文件顶部的 <?xml version="1.0" encoding="UTF-8"?>
指令的 ByteString,但我对流不够熟悉,不知道该怎么做做吧。如果我删除该指令,我会得到一个关于有 2 个根元素的异常。
我的目标是构建一个图形,从某个位置读取文件并发出 CustomContent
以供进一步处理。我如何修改它以将每个文件视为不同的输入单元?
将文件视为不同的 Source
s,然后将它们合并为一个 Source
:
val files: List[String] = ???
val sources: List[Source[CustomContent, Future[IOResult]]] =
files
.map { f =>
FileIO.fromPath(Paths.get(f))
.via(XMLParsing.parser)
.via(contentFlow)
}
val mergedSource: Source[CustomContent, NotUsed] =
Source(sources).flatMapConcat(identity)
mergedSource.runForeach(println)
我在使用 alpakka 的 XmlParsing Flow 时遇到问题
val files: List[String] = ... // file paths locally on disk
// simple source emitting the contents of 2 XML files
val documentSource = FileIO.fromPath(Paths.get(files.head))
.concat(FileIO.fromPath(Paths.get(files(1))))
val contentFlow: Flow[ParseEvent, CustomContent, Notused] =
Flow.fromGraph(new ContentProcessorFlow)
documentSource
.via(XmlParsing.parser)
.via(contentFlow)
.to(Sink.foreach(println))
.run
当这是 运行 时,图表会打印 contentFlow
发出的元素,这些元素是正确的并且与第一个文件的预期值匹配。在那之后,抛出这个异常:
[ERROR] [12/20/2018 16:32:23.648] [Sync-akka.actor.default-dispatcher-2] [akka://Sync/system/StreamSupervisor-0/flow-0-0-ignoreSink] Error in stage [akka.stream.alpakka.xml.impl.StreamingXmlParser@36b80955]: Illegal processing instruction target: 'xml' (case insensitive) is reserved by the xml specification
at [row,col {unknown-source}]: [44,17]
com.fasterxml.aalto.WFCException: Illegal processing instruction target: 'xml' (case insensitive) is reserved by the xml specification
at [row,col {unknown-source}]: [44,17]
at com.fasterxml.aalto.in.XmlScanner.reportInputProblem(XmlScanner.java:1333)
at com.fasterxml.aalto.async.AsyncByteScanner.checkPITargetName(AsyncByteScanner.java:665)
at com.fasterxml.aalto.async.AsyncByteArrayScanner.handlePI(AsyncByteArrayScanner.java:2091)
at com.fasterxml.aalto.async.AsyncByteArrayScanner.nextFromProlog(AsyncByteArrayScanner.java:1064)
at com.fasterxml.aalto.stax.StreamReaderImpl.next(StreamReaderImpl.java:802)
at akka.stream.alpakka.xml.impl.StreamingXmlParser$$anon.advanceParser(StreamingXmlParser.scala:55)
我了解这里发生的事情的基础 - 解析器抱怨第二个文件顶部的 <?xml version="1.0" encoding="UTF-8"?>
指令的 ByteString,但我对流不够熟悉,不知道该怎么做做吧。如果我删除该指令,我会得到一个关于有 2 个根元素的异常。
我的目标是构建一个图形,从某个位置读取文件并发出 CustomContent
以供进一步处理。我如何修改它以将每个文件视为不同的输入单元?
将文件视为不同的 Source
s,然后将它们合并为一个 Source
:
val files: List[String] = ???
val sources: List[Source[CustomContent, Future[IOResult]]] =
files
.map { f =>
FileIO.fromPath(Paths.get(f))
.via(XMLParsing.parser)
.via(contentFlow)
}
val mergedSource: Source[CustomContent, NotUsed] =
Source(sources).flatMapConcat(identity)
mergedSource.runForeach(println)