星火:aggregateByKey成一对列表
Spark: aggregateByKey into a pair of lists
我有一组键控记录,其中包含图书 ID 以及 reader ID 字段。
case class Book(book: Int, reader: Int)
如何使用 aggregateByKey
将具有相同键的所有记录合并为以下格式的一条记录:
(key:Int, (books: List:[Int], readers: List:[Int]))
其中 books 是所有书籍的列表,readers 是具有给定键的记录中所有 readers 的列表?
我的代码(如下)导致编译错误:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
object Aggr {
case class Book(book: Int, reader: Int)
val bookArray = Array(
(2,Book(book = 1, reader = 700)),
(3,Book(book = 2, reader = 710)),
(4,Book(book = 3, reader = 710)),
(2,Book(book = 8, reader = 710)),
(3,Book(book = 1, reader = 720)),
(4,Book(book = 2, reader = 720)),
(4,Book(book = 8, reader = 720)),
(3,Book(book = 3, reader = 730)),
(4,Book(book = 8, reader = 740))
)
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setMaster("local[5]")
.setAppName("Aggr")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val books = sc.parallelize(bookArray)
val aggr = books.aggregateByKey((List()[Int], List()[Int]))
({case
((bookList:List[Int],readerList:List[Int]), Book(book, reader)) =>
(bookList ++ List(book), readerList ++ List(reader))
},
{case ((bookLst1:List[Int], readerLst1:List[Int]),
(bookLst2:List[Int], readerLst2:List[Int])
) => (bookLst1 ++ bookLst2, readerLst1 ++ readerLst2) })
}
}
错误:
Error:(36, 44) object Nil does not take type parameters.
val aggr = books.aggregateByKey((List()[Int], List()[Int]))
Error:(37, 6) missing parameter type for expanded function The argument types of an anonymous function must be fully known. (SLS 8.5) Expected type was: ?
({case
^
^
更新
当使用 (List(0), List(0)
初始化累加器时,一切都会编译,但会在结果中插入额外的零。很有趣:
val aggr : RDD[(Int, (List[Int], List[Int]))] = books.aggregateByKey((List(0), List(0))) (
{case
((bookList:List[Int],readerList:List[Int]), Book(book, reader)) =>
(bookList ++ List(book), readerList ++ List(reader))
},
{case ((bookLst1:List[Int], readerLst1:List[Int]),
(bookLst2:List[Int], readerLst2:List[Int])
) => (bookLst1 ++ bookLst2, readerLst1 ++ readerLst2) }
)
这导致以下输出:
[Stage 0:> (0 + 0) / 5](2,(List(0, 1, 0, 8),List(0, 700, 0, 710)))
(3,(List(0, 2, 0, 1, 0, 3),List(0, 710, 0, 720, 0, 730)))
(4,(List(0, 3, 0, 2, 8, 0, 8),List(0, 710, 0, 720, 720, 0, 740)))
假设我可以将空列表作为初始值设定项而不是带零的列表,我当然不会有额外的零,列表会很好地连接起来。
谁能解释一下为什么空列表初始值设定项 (List(), List()
会导致错误并且 (List(0), List(0)
可以编译。是 Scala 错误还是功能?
其实你做的都还行,只是你的indentation/syntax风格有点马虎,你只需要移去一个括号就可以了:
val aggr = books.aggregateByKey((List()[Int], List()[Int]))
({case
进入这个:
val aggr = books.aggregateByKey((List[Int](), List[Int]())) (
{case
这些链接可能会阐明为什么这对您不起作用:
What are the precise rules for when you can omit parenthesis, dots, braces, = (functions), etc.?(第一个答案)
http://docs.scala-lang.org/style/method-invocation.html#suffix-notation
正在回答您的更新 - 您放错了列表的类型声明。如果您将它们声明为 List[Int]()
而不是 List()[Int]
,一切都会有效。编译器错误消息正确地告诉了您问题所在,但不太容易理解。通过将 [Int]
放在末尾,您将类型参数传递给 List()
函数的 result。 List()
的结果是 Nil
- 表示空列表的单例对象 - 它不采用类型参数。
至于为什么 List(0)
也有效 - scala 执行 类型推断,如果可以的话。您已经声明了一个列表的一个元素——它是 0,一个整数,所以它推断这是一个 List[Int]
。但是请注意,这并没有声明一个空列表,而是一个带有单个零的列表。您可能想改用 List[Int]()
。
仅使用 List()
是行不通的,因为 scala 无法推断空列表的类型。
我有一组键控记录,其中包含图书 ID 以及 reader ID 字段。
case class Book(book: Int, reader: Int)
如何使用 aggregateByKey
将具有相同键的所有记录合并为以下格式的一条记录:
(key:Int, (books: List:[Int], readers: List:[Int]))
其中 books 是所有书籍的列表,readers 是具有给定键的记录中所有 readers 的列表?
我的代码(如下)导致编译错误:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
object Aggr {
case class Book(book: Int, reader: Int)
val bookArray = Array(
(2,Book(book = 1, reader = 700)),
(3,Book(book = 2, reader = 710)),
(4,Book(book = 3, reader = 710)),
(2,Book(book = 8, reader = 710)),
(3,Book(book = 1, reader = 720)),
(4,Book(book = 2, reader = 720)),
(4,Book(book = 8, reader = 720)),
(3,Book(book = 3, reader = 730)),
(4,Book(book = 8, reader = 740))
)
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setMaster("local[5]")
.setAppName("Aggr")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val books = sc.parallelize(bookArray)
val aggr = books.aggregateByKey((List()[Int], List()[Int]))
({case
((bookList:List[Int],readerList:List[Int]), Book(book, reader)) =>
(bookList ++ List(book), readerList ++ List(reader))
},
{case ((bookLst1:List[Int], readerLst1:List[Int]),
(bookLst2:List[Int], readerLst2:List[Int])
) => (bookLst1 ++ bookLst2, readerLst1 ++ readerLst2) })
}
}
错误:
Error:(36, 44) object Nil does not take type parameters.
val aggr = books.aggregateByKey((List()[Int], List()[Int]))
Error:(37, 6) missing parameter type for expanded function The argument types of an anonymous function must be fully known. (SLS 8.5) Expected type was: ?
({case
^
^
更新
当使用 (List(0), List(0)
初始化累加器时,一切都会编译,但会在结果中插入额外的零。很有趣:
val aggr : RDD[(Int, (List[Int], List[Int]))] = books.aggregateByKey((List(0), List(0))) (
{case
((bookList:List[Int],readerList:List[Int]), Book(book, reader)) =>
(bookList ++ List(book), readerList ++ List(reader))
},
{case ((bookLst1:List[Int], readerLst1:List[Int]),
(bookLst2:List[Int], readerLst2:List[Int])
) => (bookLst1 ++ bookLst2, readerLst1 ++ readerLst2) }
)
这导致以下输出:
[Stage 0:> (0 + 0) / 5](2,(List(0, 1, 0, 8),List(0, 700, 0, 710)))
(3,(List(0, 2, 0, 1, 0, 3),List(0, 710, 0, 720, 0, 730)))
(4,(List(0, 3, 0, 2, 8, 0, 8),List(0, 710, 0, 720, 720, 0, 740)))
假设我可以将空列表作为初始值设定项而不是带零的列表,我当然不会有额外的零,列表会很好地连接起来。
谁能解释一下为什么空列表初始值设定项 (List(), List()
会导致错误并且 (List(0), List(0)
可以编译。是 Scala 错误还是功能?
其实你做的都还行,只是你的indentation/syntax风格有点马虎,你只需要移去一个括号就可以了:
val aggr = books.aggregateByKey((List()[Int], List()[Int]))
({case
进入这个:
val aggr = books.aggregateByKey((List[Int](), List[Int]())) (
{case
这些链接可能会阐明为什么这对您不起作用:
What are the precise rules for when you can omit parenthesis, dots, braces, = (functions), etc.?(第一个答案)
http://docs.scala-lang.org/style/method-invocation.html#suffix-notation
正在回答您的更新 - 您放错了列表的类型声明。如果您将它们声明为 List[Int]()
而不是 List()[Int]
,一切都会有效。编译器错误消息正确地告诉了您问题所在,但不太容易理解。通过将 [Int]
放在末尾,您将类型参数传递给 List()
函数的 result。 List()
的结果是 Nil
- 表示空列表的单例对象 - 它不采用类型参数。
至于为什么 List(0)
也有效 - scala 执行 类型推断,如果可以的话。您已经声明了一个列表的一个元素——它是 0,一个整数,所以它推断这是一个 List[Int]
。但是请注意,这并没有声明一个空列表,而是一个带有单个零的列表。您可能想改用 List[Int]()
。
仅使用 List()
是行不通的,因为 scala 无法推断空列表的类型。