使用 mapPartitions 避免与 groupby 和 count 的混洗
Using mapPartitions to avoid the shuffle with groupby and count
我有以下数据,我需要根据键进行分组并根据键计算数量以监控指标。我可以使用 groupBy 并对该组进行计数,但这涉及一些洗牌。我们可以不洗牌吗?
ID,TempID,PermanantID
----------
xxx, abcd, 12345
xxx, efg, 1345
xxx, ijk, 1534
xxx, lmn, 13455
xxx, null, 12345
xxx, axg, null
yyy, abcd, 12345
yyy, efg, 1345
yyy, ijk, 1534
zzz, lmn, 13455
zzz, abc, null
输出应该是
ID Count1 Count2
----------
XXX 5 5
YYY 3 3
ZZZ 2 1
我可以用 groupBy 和 count
dataframe.groupby("ID").agg(col("TempID").as("Count1"),count(col("PermanantID").as("Count2"))
我们可以使用 mapPartition 来做到这一点吗?
这个问题虽然可以理解,但有缺陷。
mapPartitions 不能直接在数据帧上使用,而是在 RDD 和数据集上使用。
此外,调用 mapPartitions 之前所需的分区和混洗又如何呢?否则,结果将不正确。问题中最初没有提到数据顺序的保证。
因此,我将依赖假定的 groupBy 方法。认为 App 不需要 shuffling 是一种幻想,相反我们可以减少 shuffling,这就是目标。
老问题,但我觉得这个问题有些悬而未决。在回答评论中隐含的问题时,OP 似乎希望首先按分区聚合,然后按组聚合(以避免不惜一切代价进行洗牌)。因此输出将(故意)看起来不像问题中给出的示例输出。
好与不好,这似乎可以实现不洗牌的聚合
import org.apache.spark.sql._
dataframe.
withColumn("partition_id", spark_partition_id).
groupby(col("partition_id"), col("ID")).
agg(
col("TempID").as("Count1"),
count(col("PermanantID").as("Count2")).
drop(col("partition_id"))
我有以下数据,我需要根据键进行分组并根据键计算数量以监控指标。我可以使用 groupBy 并对该组进行计数,但这涉及一些洗牌。我们可以不洗牌吗?
ID,TempID,PermanantID
----------
xxx, abcd, 12345
xxx, efg, 1345
xxx, ijk, 1534
xxx, lmn, 13455
xxx, null, 12345
xxx, axg, null
yyy, abcd, 12345
yyy, efg, 1345
yyy, ijk, 1534
zzz, lmn, 13455
zzz, abc, null
输出应该是
ID Count1 Count2
----------
XXX 5 5
YYY 3 3
ZZZ 2 1
我可以用 groupBy 和 count
dataframe.groupby("ID").agg(col("TempID").as("Count1"),count(col("PermanantID").as("Count2"))
我们可以使用 mapPartition 来做到这一点吗?
这个问题虽然可以理解,但有缺陷。
mapPartitions 不能直接在数据帧上使用,而是在 RDD 和数据集上使用。
此外,调用 mapPartitions 之前所需的分区和混洗又如何呢?否则,结果将不正确。问题中最初没有提到数据顺序的保证。
因此,我将依赖假定的 groupBy 方法。认为 App 不需要 shuffling 是一种幻想,相反我们可以减少 shuffling,这就是目标。
老问题,但我觉得这个问题有些悬而未决。在回答评论中隐含的问题时,OP 似乎希望首先按分区聚合,然后按组聚合(以避免不惜一切代价进行洗牌)。因此输出将(故意)看起来不像问题中给出的示例输出。
好与不好,这似乎可以实现不洗牌的聚合
import org.apache.spark.sql._
dataframe.
withColumn("partition_id", spark_partition_id).
groupby(col("partition_id"), col("ID")).
agg(
col("TempID").as("Count1"),
count(col("PermanantID").as("Count2")).
drop(col("partition_id"))