星火:aggregateByKey成一对列表

Spark: aggregateByKey into a pair of lists

我有一组键控记录,其中包含图书 ID 以及 reader ID 字段。

case class Book(book: Int, reader: Int)

如何使用 aggregateByKey 将具有相同键的所有记录合并为以下格式的一条记录:

(key:Int, (books: List:[Int], readers: List:[Int])) 

其中 books 是所有书籍的列表,readers 是具有给定键的记录中所有 readers 的列表?

我的代码(如下)导致编译错误:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}

object Aggr {

  case class Book(book: Int, reader: Int)

  val bookArray = Array(
      (2,Book(book = 1, reader = 700)),
      (3,Book(book = 2, reader = 710)),
      (4,Book(book = 3, reader = 710)),
      (2,Book(book = 8, reader = 710)),
      (3,Book(book = 1, reader = 720)),
      (4,Book(book = 2, reader = 720)),
      (4,Book(book = 8, reader = 720)),
      (3,Book(book = 3, reader = 730)),
      (4,Book(book = 8, reader = 740))
  )

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setMaster("local[5]")
      .setAppName("Aggr")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)

    val books = sc.parallelize(bookArray)
    val aggr = books.aggregateByKey((List()[Int], List()[Int]))
    ({case
      ((bookList:List[Int],readerList:List[Int]), Book(book, reader)) =>
      (bookList ++ List(book), readerList ++ List(reader))
      },
    {case ((bookLst1:List[Int], readerLst1:List[Int]),
    (bookLst2:List[Int], readerLst2:List[Int])
      ) => (bookLst1 ++ bookLst2, readerLst1 ++ readerLst2) })


  }
}

错误:

Error:(36, 44) object Nil does not take type parameters.
val aggr = books.aggregateByKey((List()[Int], List()[Int]))

Error:(37, 6) missing parameter type for expanded function The argument types of an anonymous function must be fully known. (SLS 8.5) Expected type was: ?
({case
 ^
                                       ^

更新

当使用 (List(0), List(0) 初始化累加器时,一切都会编译,但会在结果中插入额外的零。很有趣:

val aggr :  RDD[(Int, (List[Int], List[Int]))] = books.aggregateByKey((List(0), List(0))) (
{case
  ((bookList:List[Int],readerList:List[Int]), Book(book, reader)) =>
  (bookList ++ List(book), readerList ++ List(reader))
  },
{case ((bookLst1:List[Int], readerLst1:List[Int]),
(bookLst2:List[Int], readerLst2:List[Int])
  ) => (bookLst1 ++ bookLst2, readerLst1 ++ readerLst2) }
)

这导致以下输出:

[Stage 0:>                                                          (0 + 0) / 5](2,(List(0, 1, 0, 8),List(0, 700, 0, 710)))
(3,(List(0, 2, 0, 1, 0, 3),List(0, 710, 0, 720, 0, 730)))
(4,(List(0, 3, 0, 2, 8, 0, 8),List(0, 710, 0, 720, 720, 0, 740)))

假设我可以将空列表作为初始值设定项而不是带零的列表,我当然不会有额外的零,列表会很好地连接起来。

谁能解释一下为什么空列表初始值设定项 (List(), List() 会导致错误并且 (List(0), List(0) 可以编译。是 Scala 错误还是功能?

其实你做的都还行,只是你的indentation/syntax风格有点马虎,你只需要移去一个括号就可以了:

val aggr = books.aggregateByKey((List()[Int], List()[Int]))
({case

进入这个:

val aggr = books.aggregateByKey((List[Int](), List[Int]())) (
    {case

这些链接可能会阐明为什么这对您不起作用:

What are the precise rules for when you can omit parenthesis, dots, braces, = (functions), etc.?(第一个答案)

http://docs.scala-lang.org/style/method-invocation.html#suffix-notation

正在回答您的更新 - 您放错了列表的类型声明。如果您将它们声明为 List[Int]() 而不是 List()[Int],一切都会有效。编译器错误消息正确地告诉了您问题所在,但不太容易理解。通过将 [Int] 放在末尾,您将类型参数传递给 List() 函数的 resultList() 的结果是 Nil - 表示空列表的单例对象 - 它不采用类型参数。

至于为什么 List(0) 也有效 - scala 执行 类型推断,如果可以的话。您已经声明了一个列表的一个元素——它是 0,一个整数,所以它推断这是一个 List[Int]。但是请注意,这并没有声明一个空列表,而是一个带有单个零的列表。您可能想改用 List[Int]()

仅使用 List() 是行不通的,因为 scala 无法推断空列表的类型。