在 apache spark 中读取 RDF
Reading RDF in apache spark
我正在尝试使用 Apache Jena 将 RDF\XML
文件读入 Apache spark(scala 2.11,apache spark 1.4.1)。我写了这个 Scala 片段:
val factory = new RdfXmlReaderFactory()
HadoopRdfIORegistry.addReaderFactory(factory)
val conf = new Configuration()
conf.set("rdf.io.input.ignore-bad-tuples", "false")
val data = sc.newAPIHadoopFile(path,
classOf[RdfXmlInputFormat],
classOf[LongWritable], //position
classOf[TripleWritable], //value
conf)
data.take(10).foreach(println)
但是它抛出一个错误:
INFO readers.AbstractLineBasedNodeTupleReader: Got split with start 0 and length 21765995 for file with total length of 21765995
15/07/23 01:52:42 ERROR readers.AbstractLineBasedNodeTupleReader: Error parsing whole file, aborting further parsing
org.apache.jena.riot.RiotException: Producer failed to ever call start(), declaring producer dead
at org.apache.jena.riot.lang.PipedRDFIterator.hasNext(PipedRDFIterator.java:272)
at org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileNodeTupleReader.nextKeyValue(AbstractWholeFileNodeTupleReader.java:242)
at org.apache.jena.hadoop.rdf.io.input.readers.AbstractRdfReader.nextKeyValue(AbstractRdfReader.java:85)
at org.apache.spark.rdd.NewHadoopRDD$$anon.hasNext(NewHadoopRDD.scala:143)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:350)
...
ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.IOException: Error parsing whole file at position 0, aborting further parsing
at org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileNodeTupleReader.nextKeyValue(AbstractWholeFileNodeTupleReader.java:285)
at org.apache.jena.hadoop.rdf.io.input.readers.AbstractRdfReader.nextKeyValue(AbstractRdfReader.java:85)
at org.apache.spark.rdd.NewHadoopRDD$$anon.hasNext(NewHadoopRDD.scala:143)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:350)
文件很好,因为我可以在本地解析它。我想念什么?
编辑
重现行为的一些信息
进口:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable
import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry
import org.apache.jena.hadoop.rdf.io.registry.readers.RdfXmlReaderFactory
import org.apache.jena.hadoop.rdf.types.QuadWritable
import org.apache.spark.SparkContext
scalaVersion := "2.11.7"
依赖项:
"org.apache.hadoop" % "hadoop-common" % "2.7.1",
"org.apache.hadoop" % "hadoop-mapreduce-client-common" % "2.7.1",
"org.apache.hadoop" % "hadoop-streaming" % "2.7.1",
"org.apache.spark" % "spark-core_2.11" % "1.4.1",
"com.hp.hpl.jena" % "jena" % "2.6.4",
"org.apache.jena" % "jena-elephas-io" % "0.9.0",
"org.apache.jena" % "jena-elephas-mapreduce" % "0.9.0"
我正在使用来自 here. It's freely available information about John Peel sessions (more info about dump) 的示例 rdf。
感谢大家在评论中讨论。这个问题真的很棘手,从堆栈跟踪中看不清楚:代码需要一个额外的依赖项才能工作jena-core
,并且必须先打包这个依赖项。
"org.apache.jena" % "jena-core" % "2.13.0"
"com.hp.hpl.jena" % "jena" % "2.6.4"
我使用这个组装策略:
lazy val strategy = assemblyMergeStrategy in assembly <<= (assemblyMergeStrategy in assembly) { (old) => {
case PathList("META-INF", xs @ _*) =>
(xs map {_.toLowerCase}) match {
case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) => MergeStrategy.discard
case _ => MergeStrategy.discard
}
case x => MergeStrategy.first
}
}
看来您的问题出在您手动管理依赖项上。
在我的环境中,我只是将以下内容传递给我的 Spark shell:
--packages org.apache.jena:jena-elephas-io:0.9.0
这会为您解决所有依赖关系问题
如果您正在构建 SBT 项目,那么在 build.sbt
中执行以下操作就足够了:
libraryDependencies += "org.apache.jena" % "jena-elephas-io" % "0.9.0"
我正在尝试使用 Apache Jena 将 RDF\XML
文件读入 Apache spark(scala 2.11,apache spark 1.4.1)。我写了这个 Scala 片段:
val factory = new RdfXmlReaderFactory()
HadoopRdfIORegistry.addReaderFactory(factory)
val conf = new Configuration()
conf.set("rdf.io.input.ignore-bad-tuples", "false")
val data = sc.newAPIHadoopFile(path,
classOf[RdfXmlInputFormat],
classOf[LongWritable], //position
classOf[TripleWritable], //value
conf)
data.take(10).foreach(println)
但是它抛出一个错误:
INFO readers.AbstractLineBasedNodeTupleReader: Got split with start 0 and length 21765995 for file with total length of 21765995
15/07/23 01:52:42 ERROR readers.AbstractLineBasedNodeTupleReader: Error parsing whole file, aborting further parsing
org.apache.jena.riot.RiotException: Producer failed to ever call start(), declaring producer dead
at org.apache.jena.riot.lang.PipedRDFIterator.hasNext(PipedRDFIterator.java:272)
at org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileNodeTupleReader.nextKeyValue(AbstractWholeFileNodeTupleReader.java:242)
at org.apache.jena.hadoop.rdf.io.input.readers.AbstractRdfReader.nextKeyValue(AbstractRdfReader.java:85)
at org.apache.spark.rdd.NewHadoopRDD$$anon.hasNext(NewHadoopRDD.scala:143)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:350)
...
ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.IOException: Error parsing whole file at position 0, aborting further parsing
at org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileNodeTupleReader.nextKeyValue(AbstractWholeFileNodeTupleReader.java:285)
at org.apache.jena.hadoop.rdf.io.input.readers.AbstractRdfReader.nextKeyValue(AbstractRdfReader.java:85)
at org.apache.spark.rdd.NewHadoopRDD$$anon.hasNext(NewHadoopRDD.scala:143)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:350)
文件很好,因为我可以在本地解析它。我想念什么?
编辑 重现行为的一些信息
进口:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable
import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry
import org.apache.jena.hadoop.rdf.io.registry.readers.RdfXmlReaderFactory
import org.apache.jena.hadoop.rdf.types.QuadWritable
import org.apache.spark.SparkContext
scalaVersion := "2.11.7"
依赖项:
"org.apache.hadoop" % "hadoop-common" % "2.7.1",
"org.apache.hadoop" % "hadoop-mapreduce-client-common" % "2.7.1",
"org.apache.hadoop" % "hadoop-streaming" % "2.7.1",
"org.apache.spark" % "spark-core_2.11" % "1.4.1",
"com.hp.hpl.jena" % "jena" % "2.6.4",
"org.apache.jena" % "jena-elephas-io" % "0.9.0",
"org.apache.jena" % "jena-elephas-mapreduce" % "0.9.0"
我正在使用来自 here. It's freely available information about John Peel sessions (more info about dump) 的示例 rdf。
感谢大家在评论中讨论。这个问题真的很棘手,从堆栈跟踪中看不清楚:代码需要一个额外的依赖项才能工作jena-core
,并且必须先打包这个依赖项。
"org.apache.jena" % "jena-core" % "2.13.0"
"com.hp.hpl.jena" % "jena" % "2.6.4"
我使用这个组装策略:
lazy val strategy = assemblyMergeStrategy in assembly <<= (assemblyMergeStrategy in assembly) { (old) => {
case PathList("META-INF", xs @ _*) =>
(xs map {_.toLowerCase}) match {
case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) => MergeStrategy.discard
case _ => MergeStrategy.discard
}
case x => MergeStrategy.first
}
}
看来您的问题出在您手动管理依赖项上。
在我的环境中,我只是将以下内容传递给我的 Spark shell:
--packages org.apache.jena:jena-elephas-io:0.9.0
这会为您解决所有依赖关系问题
如果您正在构建 SBT 项目,那么在 build.sbt
中执行以下操作就足够了:
libraryDependencies += "org.apache.jena" % "jena-elephas-io" % "0.9.0"