在 PySpark 中按 id 范围均匀分布数据集

Distribute dataset evenly by range of id in PySpark

我是 PySpark 的新手,在分区数据方面遇到了挑战。

我有 2 个数据集:

在我看来,我只能按 ad_id 进行分区,我的问题是: 我如何为两个数据集按 ad_id 的范围均匀分布数据,以便当我需要计算两个数据集之间的连接时,它会更快?

这是我正在尝试做的事情:

ads.write.partitionBy("ad_id").mode('overwrite').parquet(os.path.join(output_data, 'ads_table'))

谢谢!

使用分桶

如果您使用的是 spark v2.3 及更高版本,则可以使用 bucketing 来避免写入后在连接上发生的随机播放。

通过分桶,您可以根据列(通常是您要加入的列)将数据放入桶中。那么当spark再次从buckets中读取数据时,就不需要进行exchange了。

1。示例数据

交易(事实)

t1.sample(n=5)

ad_id     impressions
30        528749
1         552233
30        24298
30        311914
60        41661

名称(维度)

t2.sample(n=5)

ad_id     brand_name
1         McDonalds
30        McDonalds
30        Coca-Cola
1         Coca-Cola
30        Levis

2。禁用广播加入

由于一个 table 大而另一个小,您需要禁用 broadcastJoin

sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")

3。没有分桶

t = spark.createDataFrame(t1)
b = spark.createDataFrame(t2)


t.write.saveAsTable('unbucketed_transactions')
b.write.saveAsTable('unbucketed_brands')

unbucketed_transactions = sqlContext.table("unbucketed_transactions")
unbucketed_brands = sqlContext.table("unbucketed_brands")


unbucketed_transactions.join(unbucketed_brands, 'ad_id').explain()



+- Project [ad_id#1842L, impressions#1843L, brand_name#1847]
   +- SortMergeJoin [ad_id#1842L], [ad_id#1846L], Inner
      :- Sort [ad_id#1842L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(ad_id#1842L, 200), true, [id=#1336]     <-- 0_0
      :     +- Project [ad_id#1842L, impressions#1843L]
      :        +- Filter isnotnull(ad_id#1842L)
      :           +- FileScan parquet default.unbucketed_transactions
      +- Sort [ad_id#1846L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(ad_id#1846L, 200), true, [id=#1337]     <-- 0_0 
            +- Project [ad_id#1846L, brand_name#1847]
               +- Filter isnotnull(ad_id#1846L)
                  +- FileScan parquet default.unbucketed_brands


如您所见,由于未存储的连接,发生了一次交换。

4。使用分桶

# The number 30 tells spark how large the buckets should be. 
# The second parameter is what column the bucket should be based on.

unbucketed_transactions.write \
.bucketBy(30,'ad_id') \
.sortBy('ad_id') \
.saveAsTable('bucketed_transactions')


unbucketed_brands.write \
.bucketBy(30,'ad_id') \
.sortBy('ad_id') \
.saveAsTable('bucketed_brands')

transactions = sqlContext.table("bucketed_transactions")
brands = sqlContext.table("bucketed_brands")

transactions.join(brands, 'ad_id').explain()


+- Project [ad_id#1867L, impressions#1868L, brand_name#1872]
   +- SortMergeJoin [ad_id#1867L], [ad_id#1871L], Inner
      :- Sort [ad_id#1867L ASC NULLS FIRST], false, 0
      :  +- Project [ad_id#1867L, impressions#1868L]
      :     +- Filter isnotnull(ad_id#1867L)
      :        +- FileScan parquet default.bucketed_transactions
      +- Sort [ad_id#1871L ASC NULLS FIRST], false, 0
         +- Project [ad_id#1871L, brand_name#1872]
            +- Filter isnotnull(ad_id#1871L)
               +- FileScan parquet default.bucketed_brands

从上面的计划可以看出,没有更多的交换发生。 因此,您将通过避免交换来提高您的表现。