使用元组时 Spark java.lang.NullPointerException
Spark java.lang.NullPointerException when using tuples
我正在使用 GraphX API for spark 构建图形并使用 Pregel API 对其进行处理。如果我 return 来自 vprog 函数的参数元组,则不会发生该错误,但如果我 return 使用相同元组的新元组,则会出现空点错误。
这是相关代码:
val verticesRDD = cleanDtaDF.select("ChildHash", "DN").rdd.map(row => (row(0).toString.toLong, (row(1).toString.toDouble,row(0).toString.toLong)))
val edgesRDD = (rawDtaDF.select("ChildHash", "ParentHash", "dealer_code", "dealer_customer_number", "parent_dealer_cust_number").rdd
.map(row => Edge(row.get(0).toString.toLong, row.get(1).toString.toLong, (row(3) + " is a child of " + row(4), " when dealer is " + row.get(2)))))
val myGraph = Graph(verticesRDD, edgesRDD)
def vprog(vertexId: VertexId, vertexDTA:(Double, Long), msg: Double): (Double, Long) = {
(vertexDTA._1, vertexDTA._2)
}
val result = myGraph.pregel(0.0, 1, activeDirection = EdgeDirection.Out)(vprog,t => Iterator((t.dstId, t.srcAttr._2)),(x, y) => x + y)
如果我对 vprog(...) 做一个简单的更改就不会发生错误——不访问元组的成员:
def vprog(vertexId: VertexId, vertexDTA:(Double, Long), msg: Double): (Double, Long) = {
vertexDTA
}
错误是
[Stage 101:> (0 + 0) / 200][Stage 102:> (0 + 4) / 200]18/03/10 20:43:16 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 102.0 (TID 5959, ue1lslaved25.na.aws.cat.com, executor 146): java.lang.NullPointerException
at $line69.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.vprog(<console>:60)
at $line70.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(<console>:75)
at $line70.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(<console>:75)
at org.apache.spark.graphx.Pregel$$anonfun.apply(Pregel.scala:125)
at org.apache.spark.graphx.Pregel$$anonfun.apply(Pregel.scala:125)
at org.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala:61)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun.apply(GraphImpl.scala:129)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun.apply(GraphImpl.scala:129)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:988)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:979)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:919)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:979)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:697)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
这个问题有一个简单的解释。它与 Spark 或 Graphx 无关。
有这个功能(去掉原来不相关的部分):
def vprog(vertexDTA:(Double, Long)): (Double, Long) = {
(vertexDTA._1, vertexDTA._2)
}
如果 arg vertexDTA
是 null
,vertexDTA._1
和 vertexDTA._2
都会抛出 NullPointerException
。
如果我们把函数改成
def vprog(vertexDTA:(Double, Long)): (Double, Long) = {
vertexDTA
}
当 arg 为 null
时,它只是 returns 它,无法访问元组的成员,因此没有 NPE
.
我正在使用 GraphX API for spark 构建图形并使用 Pregel API 对其进行处理。如果我 return 来自 vprog 函数的参数元组,则不会发生该错误,但如果我 return 使用相同元组的新元组,则会出现空点错误。 这是相关代码:
val verticesRDD = cleanDtaDF.select("ChildHash", "DN").rdd.map(row => (row(0).toString.toLong, (row(1).toString.toDouble,row(0).toString.toLong)))
val edgesRDD = (rawDtaDF.select("ChildHash", "ParentHash", "dealer_code", "dealer_customer_number", "parent_dealer_cust_number").rdd
.map(row => Edge(row.get(0).toString.toLong, row.get(1).toString.toLong, (row(3) + " is a child of " + row(4), " when dealer is " + row.get(2)))))
val myGraph = Graph(verticesRDD, edgesRDD)
def vprog(vertexId: VertexId, vertexDTA:(Double, Long), msg: Double): (Double, Long) = {
(vertexDTA._1, vertexDTA._2)
}
val result = myGraph.pregel(0.0, 1, activeDirection = EdgeDirection.Out)(vprog,t => Iterator((t.dstId, t.srcAttr._2)),(x, y) => x + y)
如果我对 vprog(...) 做一个简单的更改就不会发生错误——不访问元组的成员:
def vprog(vertexId: VertexId, vertexDTA:(Double, Long), msg: Double): (Double, Long) = {
vertexDTA
}
错误是
[Stage 101:> (0 + 0) / 200][Stage 102:> (0 + 4) / 200]18/03/10 20:43:16 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 102.0 (TID 5959, ue1lslaved25.na.aws.cat.com, executor 146): java.lang.NullPointerException
at $line69.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.vprog(<console>:60)
at $line70.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(<console>:75)
at $line70.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(<console>:75)
at org.apache.spark.graphx.Pregel$$anonfun.apply(Pregel.scala:125)
at org.apache.spark.graphx.Pregel$$anonfun.apply(Pregel.scala:125)
at org.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala:61)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun.apply(GraphImpl.scala:129)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun.apply(GraphImpl.scala:129)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:988)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:979)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:919)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:979)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:697)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
这个问题有一个简单的解释。它与 Spark 或 Graphx 无关。
有这个功能(去掉原来不相关的部分):
def vprog(vertexDTA:(Double, Long)): (Double, Long) = {
(vertexDTA._1, vertexDTA._2)
}
如果 arg vertexDTA
是 null
,vertexDTA._1
和 vertexDTA._2
都会抛出 NullPointerException
。
如果我们把函数改成
def vprog(vertexDTA:(Double, Long)): (Double, Long) = {
vertexDTA
}
当 arg 为 null
时,它只是 returns 它,无法访问元组的成员,因此没有 NPE
.