SparkException:作业因阶段失败而中止:使用 Spark-Graphx 时出现 NullPointerException

SparkException: Job aborted due to stage failure: NullPointerException when working with Spark-Graphx

我是 scala 的新手,正在寻找解决此错误的方法。


我正在处理的场景是这样的。我有 3 tables:

只有用户才能发表评论,只有企业才能收到评论。该图将是这样的:

我要找的是:

For each user I want to know the other users that made a review to the same business

我做了这个动作来创建图表:

val users = sqlContext.sql("Select user_id as ID from user")
val business= sqlContext.sql("Select business_id as ID from business")
users.write.mode(SaveMode.Append).saveAsTable("user_busin_db")
business.write.mode(SaveMode.Append).saveAsTable("user_busin_db")
val user_bus = sqlContext.sql("Select ID from user_busin_db")
val reviews = sqlContext.sql("Select user_id, business_id from review")

table user_bus 将用于创建顶点。

之后,我用 GraphX 使用以下代码创建了图表:

def str2Long(s: String) = s.##.toLong

val vertex: RDD[(VertexId, String)] = user_bus.rdd.map(x => (str2Long(x(0).asInstanceOf[String]),(x(0).asInstanceOf[String])))
val edge:RDD[Edge[String]] = reviews.rdd.map(row => Edge(str2Long(row(0).asInstanceOf[String]), str2Long(row(1).asInstanceOf[String]), "review"))

val default = "missing"
val myGraph = Graph(vertex, edge, default)

myGraph.cache()

现在回答我的问题,我尝试使用以下代码为每个用户和企业做一个 aggregateMessages:

val userAggregate: VertexRDD[(List[Long])] = myGraph.aggregateMessages[(List[Long])](triplet => {
      triplet.sendToSrc((List(triplet.dstId)))
  },
  (a,b) => (a.union(b))
)

val businessAggregate: VertexRDD[(List[Long])] = myGraph.aggregateMessages[(List[Long])](triplet => {
      triplet.sendToDst((List(triplet.srcId)))
  },
  (a,b) => (a.union(b))
) 

然后是给我错误的代码。为了收集每个用户在同一业务中发表评论的其他用户是什么,我写了这个:

userAggregate.map(userAggr =>
    (userAggr._1, userAggr._2.flatMap(userAggrListElem =>
        userAggr._2.patch(0,businessAggregate.filter(busAggr => busAggr._1 == userAggrListElem).map(row => row._2).take(1)(0),userAggr._2.size+1))))

如果我尝试在其上使用 .collect 或 .count,我会收到此错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 138.0 failed 1 times, most recent failure: Lost task 1.0 in stage 138.0 (TID 2807, localhost): java.lang.NullPointerException
at org.apache.spark.graphx.impl.VertexRDDImpl.mapVertexPartitions(VertexRDDImpl.scala:94)
    at org.apache.spark.graphx.VertexRDD.filter(VertexRDD.scala:98)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$$anonfun$apply.apply(<console>:102)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$$anonfun$apply.apply(<console>:101)
    at scala.collection.immutable.List.flatMap(List.scala:327)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(<console>:101)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(<console>:100)
    at scala.collection.Iterator$$anon.next(Iterator.scala:409)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1769)
    at org.apache.spark.rdd.RDD$$anonfun$count.apply(RDD.scala:1134)
    at org.apache.spark.rdd.RDD$$anonfun$count.apply(RDD.scala:1134)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1916)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1916)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:314)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1442)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1441)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1134)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:105)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:115)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:117)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:119)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:121)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:123)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:125)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:127)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:129)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:131)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:135)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:137)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:139)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:141)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:143)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:145)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:147)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw.<init>(<console>:149)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw.<init>(<console>:151)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw.<init>(<console>:153)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw.<init>(<console>:155)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$eval$.$print$lzycompute(<console>:7)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$eval$.$print(<console>:6)
Caused by: java.lang.NullPointerException
    at org.apache.spark.graphx.impl.VertexRDDImpl.mapVertexPartitions(VertexRDDImpl.scala:94)
    at org.apache.spark.graphx.VertexRDD.filter(VertexRDD.scala:98)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$$anonfun$apply.apply(<console>:102)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$$anonfun$apply.apply(<console>:101)
    at scala.collection.immutable.List.flatMap(List.scala:327)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(<console>:101)
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(<console>:100)
    at scala.collection.Iterator$$anon.next(Iterator.scala:409)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1769)
    at org.apache.spark.rdd.RDD$$anonfun$count.apply(RDD.scala:1134)
    at org.apache.spark.rdd.RDD$$anonfun$count.apply(RDD.scala:1134)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1916)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1916)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:314)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

如果我使用 userAggregate 的子集,该算法运行良好,事实上,如果我使用 take(1),我会得到这个结果:

Array[(org.apache.spark.graphx.VertexId, List[Long])] = Array((-1324024017,List(-1851582020, -1799460264, -1614007919, -1573604682, ...)))

即:(user_ID, List(user_id 对同一商家进行了评论,...)

现在我认为顶点有问题,某个地方有一个未连接的顶点给我 NullPointer 错误,但我无法找到它并从我的 grapf 中删除。我能做些什么来解决这个问题?

TL;DR 这不是有效的 Spark 代码。

这是预期的结果。不允许在 Apache Spark 中嵌套转换,因此您无法在 userAggregate.map 的闭包内访问 businessAggregate