如何在 Spark 中解决 "aggregateByKey is not a member of org.apache.spark.sql.Dataset"?
How to solve "aggregateByKey is not a member of org.apache.spark.sql.Dataset" in Spark?
我正在尝试这个例子:
https://backtobazics.com/big-data/spark/apache-spark-aggregatebykey-example/
但我使用的不是 RDD,而是数据框。
我尝试了以下方法:
val aggrRDD = student_df.map(r => (r.getString(0), (r.getString(1), r.getInt(2))))
.aggregateByKey(zeroVal)(seqOp, combOp)
这是此代码片段的一部分:
val student_df = sc.parallelize(Array(
("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), ("Joseph", "Biology", 82),
("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80),
("Tina", "Maths", 78), ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), ("Thomas", "Biology", 74),
("Cory", "Maths", 56), ("Cory", "Physics", 65), ("Cory", "Chemistry", 71), ("Cory", "Biology", 68),
("Jackeline", "Maths", 86), ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83),
("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), ("Juan", "Biology", 60)), 3).toDF("student", "subject", "marks")
def seqOp = (accumulator: Int, element: (String, Int)) =>
if(accumulator > element._2) accumulator else element._2
def combOp = (accumulator1: Int, accumulator2: Int) =>
if(accumulator1 > accumulator2) accumulator1 else accumulator2
val zeroVal = 0
val aggrRDD = student_df.map(r => (r.getString(0), (r.getString(1), r.getInt(2))))
.aggregateByKey(zeroVal)(seqOp, combOp)
出现此错误:
error: value aggregateByKey is not a member of org.apache.spark.sql.Dataset[(String, (String, Int))]
一个可能的原因可能是 value aggregateByKey
?
之前缺少分号
我在这里做错了什么?我如何在这上面使用数据框或数据集?
尝试在 student_df 之后和 map 之前调用 rdd:
val aggrRDD = student_df.rdd.map(r => (r.getString(0), (r.getString(1), r.getInt(2))))
.aggregateByKey(zeroVal)(seqOp, combOp)
我正在尝试这个例子:
https://backtobazics.com/big-data/spark/apache-spark-aggregatebykey-example/
但我使用的不是 RDD,而是数据框。
我尝试了以下方法:
val aggrRDD = student_df.map(r => (r.getString(0), (r.getString(1), r.getInt(2))))
.aggregateByKey(zeroVal)(seqOp, combOp)
这是此代码片段的一部分:
val student_df = sc.parallelize(Array(
("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), ("Joseph", "Biology", 82),
("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80),
("Tina", "Maths", 78), ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), ("Thomas", "Biology", 74),
("Cory", "Maths", 56), ("Cory", "Physics", 65), ("Cory", "Chemistry", 71), ("Cory", "Biology", 68),
("Jackeline", "Maths", 86), ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83),
("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), ("Juan", "Biology", 60)), 3).toDF("student", "subject", "marks")
def seqOp = (accumulator: Int, element: (String, Int)) =>
if(accumulator > element._2) accumulator else element._2
def combOp = (accumulator1: Int, accumulator2: Int) =>
if(accumulator1 > accumulator2) accumulator1 else accumulator2
val zeroVal = 0
val aggrRDD = student_df.map(r => (r.getString(0), (r.getString(1), r.getInt(2))))
.aggregateByKey(zeroVal)(seqOp, combOp)
出现此错误:
error: value aggregateByKey is not a member of org.apache.spark.sql.Dataset[(String, (String, Int))]
一个可能的原因可能是 value aggregateByKey
?
我在这里做错了什么?我如何在这上面使用数据框或数据集?
尝试在 student_df 之后和 map 之前调用 rdd:
val aggrRDD = student_df.rdd.map(r => (r.getString(0), (r.getString(1), r.getInt(2))))
.aggregateByKey(zeroVal)(seqOp, combOp)