最小化洗牌溢出和洗牌写入

Minimize shuffle spill and shuffle write

我在 HDFS 中有以下管道,我正在 spark 中处理它

输入table:batch, team, user, metric1, metric2

这个 table 可以有每小时批次的用户级别指标。在同一小时内,一个用户可以有多个条目。

级别 1 聚合: 此聚合用于获取每个用户每批次的最新条目

agg(metric1) as user_metric1, agg(metric2) as user_metric2 (group by batch, team, user)

2 级聚合: 获取团队级别指标

agg(user_metric1) as team_metric1, agg(user_metric2) as team_metric2 (group by batch, team)

输入 table 在 HDFS 中的大小为 8gb(snappy parquet 格式)。我的 spark 作业显示 shuffle 写入 40gb,每个执行程序 shuffle 溢出至少 1gb。

为了尽量减少这种情况,如果我在执行聚合之前在用户级别重新分区输入 table,

df = df.repartition('user')

它会提高性能吗?如果我想减少随机播放,应该如何处理这个问题?

同时 运行 关注资源

spark.executor.cores=6
spark.cores.max=48
spark.sql.shuffle.partitions=200

Spark 将数据从一个节点转移到另一个节点,因为资源在集群上分布(输入数据...),这可能会使计算变慢,并且可能会在集群上出现大量网络流量,对于您的情况shuffle 的数量是由于 group by ,如果你根据 goup by 的三列进行重新分区,它将减少 shuffle 的数量,对于 spark 配置,默认 spark.sql.shuffle.partitions 是 200,比方说我们将按原样配置 spark,重新分区需要一些时间,完成后计算会更快:

new_df = df.repartition("batch","team", "user")