用于 Spark 高效连接的分区数据 dataframe/dataset

Partition data for efficient joining for Spark dataframe/dataset

我需要 join 基于一些共享键列的许多 DataFrame。对于键值 RDD,可以指定一个分区器,以便将具有相同键的数据点洗牌到同一个执行器,这样加入效率更高(如果在 join 之前有洗牌相关操作)。可以在 Spark DataFrames 或 DataSets 上做同样的事情吗?

可以使用DataFrame/DataSet API 使用repartition 方法。使用此方法,您可以指定一个或多个列用于数据分区,例如

val df2 = df.repartition($"colA", $"colB")

也可以在同一个命令中同时指定想要的分区数,

val df2 = df.repartition(10, $"colA", $"colB")

注意:这并不能保证数据帧的分区将位于同一个节点上,只是分区是以相同的方式完成的。

如果你知道你会多次加入它,你可以 repartition 在加载数据帧后

val users = spark.read.load("/path/to/users").repartition('userId)

val joined1 = users.join(addresses, "userId")
joined1.show() // <-- 1st shuffle for repartition

val joined2 = users.join(salary, "userId")
joined2.show() // <-- skips shuffle for users since it's already been repartitioned

所以它会随机播放一次数据,然后在后续加入时重复使用随机播放文件。

但是,如果您知道您将重复洗牌某些键上的数据,最好的办法是将数据保存为分桶表。这将写出已经预先哈希分区的数据,因此当您读入表并加入它们时,您可以避免混洗。您可以这样做:

// you need to pick a number of buckets that makes sense for your data
users.bucketBy(50, "userId").saveAsTable("users")
addresses.bucketBy(50, "userId").saveAsTable("addresses")

val users = spark.read.table("users")
val addresses = spark.read.table("addresses")

val joined = users.join(addresses, "userId")
joined.show() // <-- no shuffle since tables are co-partitioned

为了避免随机播放,表必须使用相同的分桶(例如相同数量的分桶和分桶列的连接)。