使用 mapPartitions 避免与 groupby 和 count 的混洗

Using mapPartitions to avoid the shuffle with groupby and count

我有以下数据,我需要根据键进行分组并根据键计算数量以监控指标。我可以使用 groupBy 并对该组进行计数,但这涉及一些洗牌。我们可以不洗牌吗?

ID,TempID,PermanantID
----------

xxx, abcd, 12345

xxx, efg, 1345

xxx, ijk, 1534

xxx, lmn, 13455

xxx, null, 12345

xxx, axg, null

yyy, abcd, 12345

yyy, efg, 1345

yyy, ijk, 1534

zzz, lmn, 13455

zzz, abc, null

输出应该是

ID Count1 Count2
----------
XXX 5 5

YYY 3 3

ZZZ 2 1

我可以用 groupBy 和 count

dataframe.groupby("ID").agg(col("TempID").as("Count1"),count(col("PermanantID").as("Count2"))

我们可以使用 mapPartition 来做到这一点吗?

这个问题虽然可以理解,但有缺陷。

mapPartitions 不能直接在数据帧上使用,而是在 RDD 和数据集上使用。

此外,调用 mapPartitions 之前所需的分区和混洗又如何呢?否则,结果将不正确。问题中最初没有提到数据顺序的保证。

因此,我将依赖假定的 groupBy 方法。认为 App 不需要 shuffling 是一种幻想,相反我们可以减少 shuffling,这就是目标。

老问题,但我觉得这个问题有些悬而未决。在回答评论中隐含的问题时,OP 似乎希望首先按分区聚合,然后按组聚合(以避免不惜一切代价进行洗牌)。因此输出将(故意)看起来不像问题中给出的示例输出。

好与不好,这似乎可以实现不洗牌的聚合

import org.apache.spark.sql._


dataframe.
  withColumn("partition_id", spark_partition_id).
  groupby(col("partition_id"), col("ID")).
  agg(
    col("TempID").as("Count1"),
    count(col("PermanantID").as("Count2")).
  drop(col("partition_id"))