Spark:加入两个相同分区的数据帧时防止shuffle/exchange
Spark: Prevent shuffle/exchange when joining two identically partitioned dataframes
我有两个数据帧 df1
和 df2
,我想在一个名为 visitor_id
的高基数字段上多次加入这些表。我只想执行一次初始洗牌,并在没有 shuffling/exchanging 火花执行器之间的数据的情况下进行所有连接。
为此,我创建了另一个名为 visitor_partition
的列,该列始终为每个 visitor_id 分配一个介于 [0, 1000)
之间的随机值。我使用自定义分区程序来确保 df1
和 df2
被精确分区,以便每个分区只包含来自 visitor_partition
的一个值的行。这次初始重新分区是我唯一一次想要洗牌数据。
我已将每个数据帧保存到 s3 中的镶木地板,按访问者分区进行分区——对于每个数据帧,这将创建 1000 个文件,这些文件按 df1/visitor_partition=0
、df1/visitor_partition=1
...df1/visitor_partition=999
.
现在我从 parquet 加载每个数据帧并通过 df1.createOrReplaceTempView('df1')
将它们注册为临时视图(对于 df2 也是如此)然后 运行 以下查询
SELECT
...
FROM
df1 FULL JOIN df1 ON
df1.visitor_partition = df2.visitor_partition AND
df1.visitor_id = df2.visitor_id
理论上,查询执行计划器应该意识到这里不需要改组。例如,单个执行程序可以从 df1/visitor_partition=1
和 df2/visitor_partition=2
加载数据并加入其中的行。然而,实际上 spark 2.4.4 的查询规划器在这里执行完整的数据洗牌。
有什么方法可以防止这种随机播放发生吗?
您可以使用 bucketBy method of the DataFrameWriter (other documentation).
在下面的示例中,VisitorID 列的值将散列到 500 个桶中。通常,对于连接,Spark 会根据 VisitorID 上的哈希值执行交换阶段。但是,在这种情况下,您已经使用哈希对数据进行了预分区。
inputRdd = sc.parallelize(list((i, i%200) for i in range(0,1000000)))
schema = StructType([StructField("VisitorID", IntegerType(), True),
StructField("visitor_partition", IntegerType(), True)])
inputdf = inputRdd.toDF(schema)
inputdf.write.bucketBy(500, "VisitorID").saveAsTable("bucketed_table")
inputDf1 = spark.sql("select * from bucketed_table")
inputDf2 = spark.sql("select * from bucketed_table")
inputDf3 = inputDf1.alias("df1").join(inputDf2.alias("df2"), col("df1.VisitorID") == col("df2.VisitorID"))
有时 Spark 查询优化器仍然选择广播交换,所以对于我们的示例,让我们禁用自动广播
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
实际计划如下:
== Physical Plan ==
*(3) SortMergeJoin [VisitorID#351], [VisitorID#357], Inner
:- *(1) Sort [VisitorID#351 ASC NULLS FIRST], false, 0
: +- *(1) Project [VisitorID#351, visitor_partition#352]
: +- *(1) Filter isnotnull(VisitorID#351)
: +- *(1) FileScan parquet default.bucketed_6[VisitorID#351,visitor_partition#352] Batched: true, DataFilters: [isnotnull(VisitorID#351)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/bucketed_6], PartitionFilters: [], PushedFilters: [IsNotNull(VisitorID)], ReadSchema: struct<VisitorID:int,visitor_partition:int>, SelectedBucketsCount: 500 out of 500
+- *(2) Sort [VisitorID#357 ASC NULLS FIRST], false, 0
+- *(2) Project [VisitorID#357, visitor_partition#358]
+- *(2) Filter isnotnull(VisitorID#357)
+- *(2) FileScan parquet default.bucketed_6[VisitorID#357,visitor_partition#358] Batched: true, DataFilters: [isnotnull(VisitorID#357)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/bucketed_6], PartitionFilters: [], PushedFilters: [IsNotNull(VisitorID)], ReadSchema: struct<VisitorID:int,visitor_partition:int>, SelectedBucketsCount: 500 out of 500
做类似的事情:
inputdf.write.partitionBy("visitor_partition").saveAsTable("partitionBy_2")
确实为每个分区创建了一个文件夹结构。但它不起作用,因为 Spark 连接基于哈希并且无法利用您的自定义结构。
编辑:我误解了你的例子。我相信您说的是 partitionBy 之类的东西,而不是之前版本中提到的重新分区。
我有两个数据帧 df1
和 df2
,我想在一个名为 visitor_id
的高基数字段上多次加入这些表。我只想执行一次初始洗牌,并在没有 shuffling/exchanging 火花执行器之间的数据的情况下进行所有连接。
为此,我创建了另一个名为 visitor_partition
的列,该列始终为每个 visitor_id 分配一个介于 [0, 1000)
之间的随机值。我使用自定义分区程序来确保 df1
和 df2
被精确分区,以便每个分区只包含来自 visitor_partition
的一个值的行。这次初始重新分区是我唯一一次想要洗牌数据。
我已将每个数据帧保存到 s3 中的镶木地板,按访问者分区进行分区——对于每个数据帧,这将创建 1000 个文件,这些文件按 df1/visitor_partition=0
、df1/visitor_partition=1
...df1/visitor_partition=999
.
现在我从 parquet 加载每个数据帧并通过 df1.createOrReplaceTempView('df1')
将它们注册为临时视图(对于 df2 也是如此)然后 运行 以下查询
SELECT
...
FROM
df1 FULL JOIN df1 ON
df1.visitor_partition = df2.visitor_partition AND
df1.visitor_id = df2.visitor_id
理论上,查询执行计划器应该意识到这里不需要改组。例如,单个执行程序可以从 df1/visitor_partition=1
和 df2/visitor_partition=2
加载数据并加入其中的行。然而,实际上 spark 2.4.4 的查询规划器在这里执行完整的数据洗牌。
有什么方法可以防止这种随机播放发生吗?
您可以使用 bucketBy method of the DataFrameWriter (other documentation).
在下面的示例中,VisitorID 列的值将散列到 500 个桶中。通常,对于连接,Spark 会根据 VisitorID 上的哈希值执行交换阶段。但是,在这种情况下,您已经使用哈希对数据进行了预分区。
inputRdd = sc.parallelize(list((i, i%200) for i in range(0,1000000)))
schema = StructType([StructField("VisitorID", IntegerType(), True),
StructField("visitor_partition", IntegerType(), True)])
inputdf = inputRdd.toDF(schema)
inputdf.write.bucketBy(500, "VisitorID").saveAsTable("bucketed_table")
inputDf1 = spark.sql("select * from bucketed_table")
inputDf2 = spark.sql("select * from bucketed_table")
inputDf3 = inputDf1.alias("df1").join(inputDf2.alias("df2"), col("df1.VisitorID") == col("df2.VisitorID"))
有时 Spark 查询优化器仍然选择广播交换,所以对于我们的示例,让我们禁用自动广播
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
实际计划如下:
== Physical Plan ==
*(3) SortMergeJoin [VisitorID#351], [VisitorID#357], Inner
:- *(1) Sort [VisitorID#351 ASC NULLS FIRST], false, 0
: +- *(1) Project [VisitorID#351, visitor_partition#352]
: +- *(1) Filter isnotnull(VisitorID#351)
: +- *(1) FileScan parquet default.bucketed_6[VisitorID#351,visitor_partition#352] Batched: true, DataFilters: [isnotnull(VisitorID#351)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/bucketed_6], PartitionFilters: [], PushedFilters: [IsNotNull(VisitorID)], ReadSchema: struct<VisitorID:int,visitor_partition:int>, SelectedBucketsCount: 500 out of 500
+- *(2) Sort [VisitorID#357 ASC NULLS FIRST], false, 0
+- *(2) Project [VisitorID#357, visitor_partition#358]
+- *(2) Filter isnotnull(VisitorID#357)
+- *(2) FileScan parquet default.bucketed_6[VisitorID#357,visitor_partition#358] Batched: true, DataFilters: [isnotnull(VisitorID#357)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/bucketed_6], PartitionFilters: [], PushedFilters: [IsNotNull(VisitorID)], ReadSchema: struct<VisitorID:int,visitor_partition:int>, SelectedBucketsCount: 500 out of 500
做类似的事情:
inputdf.write.partitionBy("visitor_partition").saveAsTable("partitionBy_2")
确实为每个分区创建了一个文件夹结构。但它不起作用,因为 Spark 连接基于哈希并且无法利用您的自定义结构。
编辑:我误解了你的例子。我相信您说的是 partitionBy 之类的东西,而不是之前版本中提到的重新分区。