Spark:解决嵌套 RDD

Spark: Work around nested RDD

有两个table。首先 table 有包含两个字段 book1book2 的记录。这些是通常成对一起阅读的书籍的 ID。 第二个 table 有这些书的 booksreaders 列,其中 booksreaders 分别是书和 reader ID。对于第二个 table 中的每个 reader,我需要在成对的 table 中找到相应的书籍。例如,如果 reader 读了书 1、2、3 并且我们有对 (1,7)、(6,2)、(4,10),则此 reader 的结果列表应该有书 7 ,6.

我首先按 reader 对书籍进行分组,然后迭代成对。我尝试将每本书成对匹配用户列表中的所有书籍:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._


object Simple {

  case class Pair(book1: Int, book2: Int)
  case class Book(book: Int, reader: Int, name:String)

  val pairs = Array(
    Pair(1, 2),
    Pair(1, 3),
    Pair(5, 7)
  )

  val testRecs = Array(
    Book(book = 1, reader = 710, name = "book1"),
    Book(book = 2, reader = 710, name = "book2"),
    Book(book = 3, reader = 710, name = "book3"),
    Book(book = 8, reader = 710, name = "book8"),
    Book(book = 1, reader = 720, name = "book1"),
    Book(book = 2, reader = 720, name = "book2"),
    Book(book = 8, reader = 720, name = "book8"),
    Book(book = 3, reader = 730, name = "book3"),
    Book(book = 8, reader = 740, name = "book8")
  )

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setMaster("local[5]")
      .setAppName("Simple")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    val pairsDf = sc.parallelize(pairs).toDF()
    val testData = sc.parallelize(testRecs)

    // *** Group test data by reader
    val testByReader = testData.map(r => (r.reader, r.book))
    val testGroups = testByReader.groupByKey()
    val x = testGroups.map(tuple => tuple match {
      case(user, bookIter) => matchList(user,pairsDf, bookIter.toList)
    })
    x.foreach(println)
  }

  def matchList(user:Int, df: DataFrame, toMatch: List[Int]) = {
    //val x = df.map(r => (r(0), r(1))) --- This also fails!!
    //x
    val relatedBooks = df.map(r => {
      val book1 = r(0)
      val book2 = r(1)
      val z = toMatch.map(book =>
        if (book == book1)
          List(book2)
        else {
          if (book == book2) List(book1)
          else List()
        } //if
      )
      z.flatMap(identity)
    })
    (user,relatedBooks)
  }
}

这导致 java.lang.NullPointerException(下图)。据我了解,Spark 不支持嵌套 RDD。请建议另一种解决此任务的方法。

...
15/06/09 18:59:25 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/09 18:59:25 INFO AbstractConnector: Started SocketConnector@0.0.0.0:44837
15/06/09 18:59:26 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/09 18:59:26 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
[Stage 0:>                                                          (0 + 0) / 5]15/06/09 18:59:30 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 5)
java.lang.NullPointerException
    at org.apache.spark.sql.DataFrame.schema(DataFrame.scala:253)
    at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:961)
    at org.apache.spark.sql.DataFrame.map(DataFrame.scala:848)
    at Simple$.matchList(Simple.scala:60)
    at Simple$$anonfun.apply(Simple.scala:52)
    at Simple$$anonfun.apply(Simple.scala:51)
    at scala.collection.Iterator$$anon.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:798)
    at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:798)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

如您所见,我们无法嵌套 RDD。一种选择是发出图书用户对,然后将其与图书信息结合起来,然后按用户 ID 对结果进行分组(按键分组有点粗略,但假设没有用户读过那么多图书信息该用户不适合内存应该没问题。

您可以创建两个 rdds。一个用于 bookpair,一个用于 readerbook,然后通过 bookid 连接两个 rdds。

val bookpair = Array((1,2),(2,4),(3,4),(5,6),(4,6),(7,3))
val bookpairRdd = sc.parallelize(bookpair)
val readerbook = Array(("foo",1),("bar",2),("user1",3),("user3",4))
val readerRdd = sc.parallelize(readerbook).map(x => x.swap)
val joinedRdd = readerRdd.join(bookpairRdd)
joinedRdd.foreach(println)

(4,(user3,6))
(3,(user1,4))
(2,(bar,4))
(1,(foo,2))