最小化洗牌溢出和洗牌写入
Minimize shuffle spill and shuffle write
我在 HDFS 中有以下管道,我正在 spark 中处理它
输入table:batch, team, user, metric1, metric2
这个 table 可以有每小时批次的用户级别指标。在同一小时内,一个用户可以有多个条目。
级别 1 聚合: 此聚合用于获取每个用户每批次的最新条目
agg(metric1) as user_metric1, agg(metric2) as user_metric2 (group by batch, team, user)
2 级聚合: 获取团队级别指标
agg(user_metric1) as team_metric1, agg(user_metric2) as team_metric2 (group by batch, team)
输入 table 在 HDFS 中的大小为 8gb(snappy parquet 格式)。我的 spark 作业显示 shuffle 写入 40gb,每个执行程序 shuffle 溢出至少 1gb。
为了尽量减少这种情况,如果我在执行聚合之前在用户级别重新分区输入 table,
df = df.repartition('user')
它会提高性能吗?如果我想减少随机播放,应该如何处理这个问题?
同时 运行 关注资源
spark.executor.cores=6
spark.cores.max=48
spark.sql.shuffle.partitions=200
Spark 将数据从一个节点转移到另一个节点,因为资源在集群上分布(输入数据...),这可能会使计算变慢,并且可能会在集群上出现大量网络流量,对于您的情况shuffle 的数量是由于 group by ,如果你根据 goup by 的三列进行重新分区,它将减少 shuffle 的数量,对于 spark 配置,默认 spark.sql.shuffle.partitions 是 200,比方说我们将按原样配置 spark,重新分区需要一些时间,完成后计算会更快:
new_df = df.repartition("batch","team", "user")
我在 HDFS 中有以下管道,我正在 spark 中处理它
输入table:batch, team, user, metric1, metric2
这个 table 可以有每小时批次的用户级别指标。在同一小时内,一个用户可以有多个条目。
级别 1 聚合: 此聚合用于获取每个用户每批次的最新条目
agg(metric1) as user_metric1, agg(metric2) as user_metric2 (group by batch, team, user)
2 级聚合: 获取团队级别指标
agg(user_metric1) as team_metric1, agg(user_metric2) as team_metric2 (group by batch, team)
输入 table 在 HDFS 中的大小为 8gb(snappy parquet 格式)。我的 spark 作业显示 shuffle 写入 40gb,每个执行程序 shuffle 溢出至少 1gb。
为了尽量减少这种情况,如果我在执行聚合之前在用户级别重新分区输入 table,
df = df.repartition('user')
它会提高性能吗?如果我想减少随机播放,应该如何处理这个问题?
同时 运行 关注资源
spark.executor.cores=6
spark.cores.max=48
spark.sql.shuffle.partitions=200
Spark 将数据从一个节点转移到另一个节点,因为资源在集群上分布(输入数据...),这可能会使计算变慢,并且可能会在集群上出现大量网络流量,对于您的情况shuffle 的数量是由于 group by ,如果你根据 goup by 的三列进行重新分区,它将减少 shuffle 的数量,对于 spark 配置,默认 spark.sql.shuffle.partitions 是 200,比方说我们将按原样配置 spark,重新分区需要一些时间,完成后计算会更快:
new_df = df.repartition("batch","team", "user")