在 PySpark 中按 id 范围均匀分布数据集
Distribute dataset evenly by range of id in PySpark
我是 PySpark 的新手,在分区数据方面遇到了挑战。
我有 2 个数据集:
- 具有 ad_id 和一些属性列的广告数据集(非常大)
- 广告交易数据集(较小),包括ad_id和交易日期
在我看来,我只能按 ad_id 进行分区,我的问题是:
我如何为两个数据集按 ad_id 的范围均匀分布数据,以便当我需要计算两个数据集之间的连接时,它会更快?
这是我正在尝试做的事情:
ads.write.partitionBy("ad_id").mode('overwrite').parquet(os.path.join(output_data, 'ads_table'))
谢谢!
使用分桶
如果您使用的是 spark v2.3 及更高版本,则可以使用 bucketing 来避免写入后在连接上发生的随机播放。
通过分桶,您可以根据列(通常是您要加入的列)将数据放入桶中。那么当spark再次从buckets中读取数据时,就不需要进行exchange了。
1。示例数据
交易(事实)
t1.sample(n=5)
ad_id impressions
30 528749
1 552233
30 24298
30 311914
60 41661
名称(维度)
t2.sample(n=5)
ad_id brand_name
1 McDonalds
30 McDonalds
30 Coca-Cola
1 Coca-Cola
30 Levis
2。禁用广播加入
由于一个 table 大而另一个小,您需要禁用 broadcastJoin
。
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
3。没有分桶
t = spark.createDataFrame(t1)
b = spark.createDataFrame(t2)
t.write.saveAsTable('unbucketed_transactions')
b.write.saveAsTable('unbucketed_brands')
unbucketed_transactions = sqlContext.table("unbucketed_transactions")
unbucketed_brands = sqlContext.table("unbucketed_brands")
unbucketed_transactions.join(unbucketed_brands, 'ad_id').explain()
+- Project [ad_id#1842L, impressions#1843L, brand_name#1847]
+- SortMergeJoin [ad_id#1842L], [ad_id#1846L], Inner
:- Sort [ad_id#1842L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(ad_id#1842L, 200), true, [id=#1336] <-- 0_0
: +- Project [ad_id#1842L, impressions#1843L]
: +- Filter isnotnull(ad_id#1842L)
: +- FileScan parquet default.unbucketed_transactions
+- Sort [ad_id#1846L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(ad_id#1846L, 200), true, [id=#1337] <-- 0_0
+- Project [ad_id#1846L, brand_name#1847]
+- Filter isnotnull(ad_id#1846L)
+- FileScan parquet default.unbucketed_brands
如您所见,由于未存储的连接,发生了一次交换。
4。使用分桶
# The number 30 tells spark how large the buckets should be.
# The second parameter is what column the bucket should be based on.
unbucketed_transactions.write \
.bucketBy(30,'ad_id') \
.sortBy('ad_id') \
.saveAsTable('bucketed_transactions')
unbucketed_brands.write \
.bucketBy(30,'ad_id') \
.sortBy('ad_id') \
.saveAsTable('bucketed_brands')
transactions = sqlContext.table("bucketed_transactions")
brands = sqlContext.table("bucketed_brands")
transactions.join(brands, 'ad_id').explain()
+- Project [ad_id#1867L, impressions#1868L, brand_name#1872]
+- SortMergeJoin [ad_id#1867L], [ad_id#1871L], Inner
:- Sort [ad_id#1867L ASC NULLS FIRST], false, 0
: +- Project [ad_id#1867L, impressions#1868L]
: +- Filter isnotnull(ad_id#1867L)
: +- FileScan parquet default.bucketed_transactions
+- Sort [ad_id#1871L ASC NULLS FIRST], false, 0
+- Project [ad_id#1871L, brand_name#1872]
+- Filter isnotnull(ad_id#1871L)
+- FileScan parquet default.bucketed_brands
从上面的计划可以看出,没有更多的交换发生。
因此,您将通过避免交换来提高您的表现。
我是 PySpark 的新手,在分区数据方面遇到了挑战。
我有 2 个数据集:
- 具有 ad_id 和一些属性列的广告数据集(非常大)
- 广告交易数据集(较小),包括ad_id和交易日期
在我看来,我只能按 ad_id 进行分区,我的问题是: 我如何为两个数据集按 ad_id 的范围均匀分布数据,以便当我需要计算两个数据集之间的连接时,它会更快?
这是我正在尝试做的事情:
ads.write.partitionBy("ad_id").mode('overwrite').parquet(os.path.join(output_data, 'ads_table'))
谢谢!
使用分桶
如果您使用的是 spark v2.3 及更高版本,则可以使用 bucketing 来避免写入后在连接上发生的随机播放。
通过分桶,您可以根据列(通常是您要加入的列)将数据放入桶中。那么当spark再次从buckets中读取数据时,就不需要进行exchange了。
1。示例数据
交易(事实)
t1.sample(n=5)
ad_id impressions
30 528749
1 552233
30 24298
30 311914
60 41661
名称(维度)
t2.sample(n=5)
ad_id brand_name
1 McDonalds
30 McDonalds
30 Coca-Cola
1 Coca-Cola
30 Levis
2。禁用广播加入
由于一个 table 大而另一个小,您需要禁用 broadcastJoin
。
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
3。没有分桶
t = spark.createDataFrame(t1)
b = spark.createDataFrame(t2)
t.write.saveAsTable('unbucketed_transactions')
b.write.saveAsTable('unbucketed_brands')
unbucketed_transactions = sqlContext.table("unbucketed_transactions")
unbucketed_brands = sqlContext.table("unbucketed_brands")
unbucketed_transactions.join(unbucketed_brands, 'ad_id').explain()
+- Project [ad_id#1842L, impressions#1843L, brand_name#1847]
+- SortMergeJoin [ad_id#1842L], [ad_id#1846L], Inner
:- Sort [ad_id#1842L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(ad_id#1842L, 200), true, [id=#1336] <-- 0_0
: +- Project [ad_id#1842L, impressions#1843L]
: +- Filter isnotnull(ad_id#1842L)
: +- FileScan parquet default.unbucketed_transactions
+- Sort [ad_id#1846L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(ad_id#1846L, 200), true, [id=#1337] <-- 0_0
+- Project [ad_id#1846L, brand_name#1847]
+- Filter isnotnull(ad_id#1846L)
+- FileScan parquet default.unbucketed_brands
如您所见,由于未存储的连接,发生了一次交换。
4。使用分桶
# The number 30 tells spark how large the buckets should be.
# The second parameter is what column the bucket should be based on.
unbucketed_transactions.write \
.bucketBy(30,'ad_id') \
.sortBy('ad_id') \
.saveAsTable('bucketed_transactions')
unbucketed_brands.write \
.bucketBy(30,'ad_id') \
.sortBy('ad_id') \
.saveAsTable('bucketed_brands')
transactions = sqlContext.table("bucketed_transactions")
brands = sqlContext.table("bucketed_brands")
transactions.join(brands, 'ad_id').explain()
+- Project [ad_id#1867L, impressions#1868L, brand_name#1872]
+- SortMergeJoin [ad_id#1867L], [ad_id#1871L], Inner
:- Sort [ad_id#1867L ASC NULLS FIRST], false, 0
: +- Project [ad_id#1867L, impressions#1868L]
: +- Filter isnotnull(ad_id#1867L)
: +- FileScan parquet default.bucketed_transactions
+- Sort [ad_id#1871L ASC NULLS FIRST], false, 0
+- Project [ad_id#1871L, brand_name#1872]
+- Filter isnotnull(ad_id#1871L)
+- FileScan parquet default.bucketed_brands
从上面的计划可以看出,没有更多的交换发生。 因此,您将通过避免交换来提高您的表现。