Spark:解决嵌套 RDD
Spark: Work around nested RDD
有两个table。首先 table 有包含两个字段 book1
和 book2
的记录。这些是通常成对一起阅读的书籍的 ID。
第二个 table 有这些书的 books
和 readers
列,其中 books
和 readers
分别是书和 reader ID。对于第二个 table 中的每个 reader,我需要在成对的 table 中找到相应的书籍。例如,如果 reader 读了书 1、2、3 并且我们有对 (1,7)、(6,2)、(4,10),则此 reader 的结果列表应该有书 7 ,6.
我首先按 reader 对书籍进行分组,然后迭代成对。我尝试将每本书成对匹配用户列表中的所有书籍:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
object Simple {
case class Pair(book1: Int, book2: Int)
case class Book(book: Int, reader: Int, name:String)
val pairs = Array(
Pair(1, 2),
Pair(1, 3),
Pair(5, 7)
)
val testRecs = Array(
Book(book = 1, reader = 710, name = "book1"),
Book(book = 2, reader = 710, name = "book2"),
Book(book = 3, reader = 710, name = "book3"),
Book(book = 8, reader = 710, name = "book8"),
Book(book = 1, reader = 720, name = "book1"),
Book(book = 2, reader = 720, name = "book2"),
Book(book = 8, reader = 720, name = "book8"),
Book(book = 3, reader = 730, name = "book3"),
Book(book = 8, reader = 740, name = "book8")
)
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setMaster("local[5]")
.setAppName("Simple")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val pairsDf = sc.parallelize(pairs).toDF()
val testData = sc.parallelize(testRecs)
// *** Group test data by reader
val testByReader = testData.map(r => (r.reader, r.book))
val testGroups = testByReader.groupByKey()
val x = testGroups.map(tuple => tuple match {
case(user, bookIter) => matchList(user,pairsDf, bookIter.toList)
})
x.foreach(println)
}
def matchList(user:Int, df: DataFrame, toMatch: List[Int]) = {
//val x = df.map(r => (r(0), r(1))) --- This also fails!!
//x
val relatedBooks = df.map(r => {
val book1 = r(0)
val book2 = r(1)
val z = toMatch.map(book =>
if (book == book1)
List(book2)
else {
if (book == book2) List(book1)
else List()
} //if
)
z.flatMap(identity)
})
(user,relatedBooks)
}
}
这导致 java.lang.NullPointerException
(下图)。据我了解,Spark 不支持嵌套 RDD。请建议另一种解决此任务的方法。
...
15/06/09 18:59:25 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/09 18:59:25 INFO AbstractConnector: Started SocketConnector@0.0.0.0:44837
15/06/09 18:59:26 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/09 18:59:26 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
[Stage 0:> (0 + 0) / 5]15/06/09 18:59:30 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 5)
java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.schema(DataFrame.scala:253)
at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:961)
at org.apache.spark.sql.DataFrame.map(DataFrame.scala:848)
at Simple$.matchList(Simple.scala:60)
at Simple$$anonfun.apply(Simple.scala:52)
at Simple$$anonfun.apply(Simple.scala:51)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:798)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:798)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
如您所见,我们无法嵌套 RDD。一种选择是发出图书用户对,然后将其与图书信息结合起来,然后按用户 ID 对结果进行分组(按键分组有点粗略,但假设没有用户读过那么多图书信息该用户不适合内存应该没问题。
您可以创建两个 rdds。一个用于 bookpair,一个用于 readerbook,然后通过 bookid 连接两个 rdds。
val bookpair = Array((1,2),(2,4),(3,4),(5,6),(4,6),(7,3))
val bookpairRdd = sc.parallelize(bookpair)
val readerbook = Array(("foo",1),("bar",2),("user1",3),("user3",4))
val readerRdd = sc.parallelize(readerbook).map(x => x.swap)
val joinedRdd = readerRdd.join(bookpairRdd)
joinedRdd.foreach(println)
(4,(user3,6))
(3,(user1,4))
(2,(bar,4))
(1,(foo,2))
有两个table。首先 table 有包含两个字段 book1
和 book2
的记录。这些是通常成对一起阅读的书籍的 ID。
第二个 table 有这些书的 books
和 readers
列,其中 books
和 readers
分别是书和 reader ID。对于第二个 table 中的每个 reader,我需要在成对的 table 中找到相应的书籍。例如,如果 reader 读了书 1、2、3 并且我们有对 (1,7)、(6,2)、(4,10),则此 reader 的结果列表应该有书 7 ,6.
我首先按 reader 对书籍进行分组,然后迭代成对。我尝试将每本书成对匹配用户列表中的所有书籍:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
object Simple {
case class Pair(book1: Int, book2: Int)
case class Book(book: Int, reader: Int, name:String)
val pairs = Array(
Pair(1, 2),
Pair(1, 3),
Pair(5, 7)
)
val testRecs = Array(
Book(book = 1, reader = 710, name = "book1"),
Book(book = 2, reader = 710, name = "book2"),
Book(book = 3, reader = 710, name = "book3"),
Book(book = 8, reader = 710, name = "book8"),
Book(book = 1, reader = 720, name = "book1"),
Book(book = 2, reader = 720, name = "book2"),
Book(book = 8, reader = 720, name = "book8"),
Book(book = 3, reader = 730, name = "book3"),
Book(book = 8, reader = 740, name = "book8")
)
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setMaster("local[5]")
.setAppName("Simple")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val pairsDf = sc.parallelize(pairs).toDF()
val testData = sc.parallelize(testRecs)
// *** Group test data by reader
val testByReader = testData.map(r => (r.reader, r.book))
val testGroups = testByReader.groupByKey()
val x = testGroups.map(tuple => tuple match {
case(user, bookIter) => matchList(user,pairsDf, bookIter.toList)
})
x.foreach(println)
}
def matchList(user:Int, df: DataFrame, toMatch: List[Int]) = {
//val x = df.map(r => (r(0), r(1))) --- This also fails!!
//x
val relatedBooks = df.map(r => {
val book1 = r(0)
val book2 = r(1)
val z = toMatch.map(book =>
if (book == book1)
List(book2)
else {
if (book == book2) List(book1)
else List()
} //if
)
z.flatMap(identity)
})
(user,relatedBooks)
}
}
这导致 java.lang.NullPointerException
(下图)。据我了解,Spark 不支持嵌套 RDD。请建议另一种解决此任务的方法。
...
15/06/09 18:59:25 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/09 18:59:25 INFO AbstractConnector: Started SocketConnector@0.0.0.0:44837
15/06/09 18:59:26 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/09 18:59:26 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
[Stage 0:> (0 + 0) / 5]15/06/09 18:59:30 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 5)
java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.schema(DataFrame.scala:253)
at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:961)
at org.apache.spark.sql.DataFrame.map(DataFrame.scala:848)
at Simple$.matchList(Simple.scala:60)
at Simple$$anonfun.apply(Simple.scala:52)
at Simple$$anonfun.apply(Simple.scala:51)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:798)
at org.apache.spark.rdd.RDD$$anonfun$foreach.apply(RDD.scala:798)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
如您所见,我们无法嵌套 RDD。一种选择是发出图书用户对,然后将其与图书信息结合起来,然后按用户 ID 对结果进行分组(按键分组有点粗略,但假设没有用户读过那么多图书信息该用户不适合内存应该没问题。
您可以创建两个 rdds。一个用于 bookpair,一个用于 readerbook,然后通过 bookid 连接两个 rdds。
val bookpair = Array((1,2),(2,4),(3,4),(5,6),(4,6),(7,3))
val bookpairRdd = sc.parallelize(bookpair)
val readerbook = Array(("foo",1),("bar",2),("user1",3),("user3",4))
val readerRdd = sc.parallelize(readerbook).map(x => x.swap)
val joinedRdd = readerRdd.join(bookpairRdd)
joinedRdd.foreach(println)
(4,(user3,6))
(3,(user1,4))
(2,(bar,4))
(1,(foo,2))