使用 Spark SQL GROUP BY 在 DataFrame 上进行高效的 PairRDD 操作

Efficient PairRDD operations on DataFrame with Spark SQL GROUP BY

这个问题是关于 DataFrameRDD 在聚合操作方面的二元性。在 Spark SQL 中,可以使用 table 为自定义聚合生成 UDF,但创建其中一个 UDF 通常明显不如使用 RDD 可用的聚合函数那么用户友好,特别是如果 table 输出是不需要。

是否有一种有效的方法可以将诸如 aggregateByKey 之类的成对 RDD 操作应用于已使用 GROUP BY 分组或使用 ORDERED BY 排序的 DataFrame?

通常,需要明确的 map 步骤来创建键值元组,例如 dataFrame.rdd.map(row => (row.getString(row.fieldIndex("category")), row).aggregateByKey(...)。这可以避免吗?

不是真的。虽然 DataFrames 可以转换为 RDDs,反之亦然,但这是相对复杂的操作,并且像 DataFrame.groupBy 这样的方法与 RDD 上的对应方法没有相同的语义。

最接近的是 Spark 1.6.0 中引入的 。它通过自己的一组方法(包括 reducecogroupmapGroups:

case class Record(id: Long, key: String, value: Double)

val df = sc.parallelize(Seq(
    (1L, "foo", 3.0), (2L, "bar", 5.6),
    (3L, "foo", -1.0), (4L, "bar", 10.0)
)).toDF("id", "key", "value")

val ds = df.as[Record]
ds.groupBy($"key").reduce((x, y) => if (x.id < y.id) x else y).show

// +-----+-----------+
// |   _1|         _2|
// +-----+-----------+
// |[bar]|[2,bar,5.6]|
// |[foo]|[1,foo,3.0]|
// +-----+-----------+

在某些特定情况下,可以利用 Orderable 语义来使用 structsarrays 对数据进行分组和处理。您会在

中找到示例